Changeset - 3cd80b266561
[Not reviewed]
default
0 1 4
drewp@bigasterisk.com - 8 months ago 2024-05-14 04:39:53
drewp@bigasterisk.com
refactor midiFade a lot; try to catch all midi events even better,
even if rdfdb runs slow
5 files changed with 271 insertions and 180 deletions:
0 comments (0 inline, 0 general)
src/light9/midifade/eventqueue.py
Show inline comments
 
new file 100644
 
import asyncio
 
import logging
 
import traceback
 

	
 
log = logging.getLogger()
 

	
 

	
 
class EventQueue:
 
    """midi events come in fast; graph consumes them slower"""
 

	
 
    def __init__(self, MAX_SEND_RATE: float, onMessage) -> None:
 
        self.MAX_SEND_RATE = MAX_SEND_RATE
 
        self.onMessage = onMessage
 
        self.msgs = asyncio.Queue()
 

	
 
    def callbackFor(self, dev):
 
        mainThreadLoop = asyncio.get_running_loop()
 

	
 
        def cb(message):
 
            # this is running in mido's thread
 
            log.info(f'enqueue {message} {"*" * message.dict()["value"]}')
 
            mainThreadLoop.call_soon_threadsafe(
 
                self.msgs.put_nowait,
 
                message.dict() | {'dev': dev},
 
            )
 

	
 
        return cb
 

	
 
    async def run(self):
 
        while True:
 
            recents = [await self.msgs.get()]
 
            while not self.msgs.empty():
 
                recents.append(self.msgs.get_nowait())
 
            try:
 
                for msg in reduceToLatestValue(recents):
 
                    # log.info(f'handle {msg=}')
 
                    await self.onMessage(msg)
 
            except Exception:
 
                traceback.print_exc()
 
                log.warning("error in onMessage- continuing anyway")
 
            await asyncio.sleep(1 / self.MAX_SEND_RATE)
 

	
 

	
 
def reduceToLatestValue(ms: list[dict]) -> list[dict]:
 
    merge = {}
 
    for m in ms:
 
        normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value')))
 
        merge[normal_key] = m
 
    return list(merge.values())
src/light9/midifade/mididevs.py
Show inline comments
 
new file 100644
 
import logging
 

	
 
import mido
 
from light9.midifade.eventqueue import EventQueue
 
from light9.midifade.pages import Pages
 
from light9.midifade.writeback import WriteBackFaders
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 

	
 
log = logging.getLogger()
 

	
 
_csb=[]
 
openPorts = []
 
def listenToMidiInputs(q: EventQueue):
 
    """put midi events on EventQueue (presumably mido does this in a bg
 
    thread)"""
 
    
 
    for inputName in mido.get_input_names():  # type: ignore
 
        if inputName.startswith('Keystation'):
 
            dev = "keystation"
 
        elif inputName.startswith('BCF2000'):
 
            dev = 'bcf2000'
 
        elif inputName.startswith('QUNEO'):
 
            dev = 'quneo'
 
        else:
 
            continue
 
        log.info(f'listening on input {inputName} {dev=}')
 
        cb = q.callbackFor(dev)
 
        _csb.append(cb)
 
        openPorts.append(mido.open_input(  # type: ignore
 
            inputName,  #
 
            callback=cb))
 

	
 

	
 
def connectToMidiOutput(graph: SyncedGraph, pages: Pages, _lastSet: dict[int, int]):
 
    for outputName in mido.get_output_names():  # type: ignore
 
        if outputName.startswith('BCF2000'):
 
            bcf_out = mido.open_output(outputName)  # type: ignore
 
            wb = WriteBackFaders(graph, pages, bcf_out, _lastSet)
 
            graph.addHandler(wb.update)
 
            break
src/light9/midifade/midifade.py
Show inline comments
 
#!bin/python
 
"""
 
Read midi events, write fade levels to graph
 

	
 
@@ -6,133 +5,66 @@ Device troubleshooting:
 
    amidi -l
 
"""
 
import asyncio
 
from functools import partial
 
import logging
 
import traceback
 
from typing import Dict, List, cast
 
from light9.effect.edit import clamp
 

	
 
import mido
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import RDF, ConjunctiveGraph, Literal, URIRef
 
from rdfdb.syncedgraph.readonly_graph import ReadOnlyConjunctiveGraph
 
from light9 import networking
 
from light9.effect.edit import clamp
 
from light9.midifade.eventqueue import EventQueue
 
from light9.midifade.mididevs import connectToMidiOutput, listenToMidiInputs
 
from light9.midifade.pages import Pages
 
from light9.namespaces import L9
 
from light9.newtypes import decimalLiteral
 
from light9.run_local import log
 
from light9.showconfig import showUri
 

	
 
mido.set_backend('alsa_midi.mido_backend')
 
MAX_SEND_RATE = 30
 

	
 
_lastSet = {}  #midictlchannel:value7bit
 

	
 
currentFaders = {}  # midi control channel num : FaderUri
 
ctx = URIRef(showUri() + '/fade')
 

	
 

	
 
def compileCurrents(graph):
 
    currentFaders.clear()
 
    try:
 
        new = getChansToFaders(graph)
 
    except ValueError:
 
        return  # e.g. empty-graph startup
 
    currentFaders.update(new)
 

	
 

	
 
def getGraphMappingNode(g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef:
 
    mapping = g.value(L9['midiControl'], L9['map'])
 
    if mapping is None:
 
        raise ValueError('no :midiControl :map ?mapping')
 
    midiDev = g.value(mapping, L9['midiDev'])
 
    ourDev = 'bcf2000'
 
    if midiDev != Literal(ourDev):
 
        raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
 
    return mapping
 

	
 

	
 
def getCurMappedPage(g: SyncedGraph):
 
    mapping = getGraphMappingNode(g)
 
    return g.value(mapping, L9['outputs'])
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 

	
 
def setCurMappedPage(g: SyncedGraph, mapping: URIRef, newPage: URIRef):
 
    g.patchObject(ctx, mapping, L9.outputs, newPage)
 
# inline of patchObject to make it async, not just create_task
 
async def asyncPatchObject(graph, context: URIRef, subject, predicate: URIRef, newObject):
 
    p = graph.getObjectPatch(context, subject, predicate, newObject)
 
    # if not p.isEmpty():
 
    #     log.debug("patchObject %r" % p.jsonRepr)
 
    await graph.patch(p)
 

	
 

	
 
def getChansToFaders(g: SyncedGraph) -> Dict[int, URIRef]:
 
    fadePage = getCurMappedPage(g)
 
    ret = []
 
    for f in g.objects(fadePage, L9.fader):
 
        columnLit = cast(Literal, g.value(f, L9['column']))
 
        col = int(columnLit.toPython())
 
        ret.append((col, f))
 

	
 
    ret.sort()
 
    ctl_channels = list(range(81, 88 + 1))
 
    out = {}
 
    for chan, (col, f) in zip(ctl_channels, ret):
 
        out[chan] = f
 
    return out
 

	
 

	
 
def changePage(g: SyncedGraph, dp: int):
 
    """dp==-1, make the previous page active, etc. Writes to graph"""
 

	
 
    with g.currentState() as current:
 
        allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp))
 
        mapping = getGraphMappingNode(current)
 
        curPage = current.value(mapping, L9.outputs)
 
    if curPage is None:
 
        curPage = allPages[0]
 
    idx = allPages.index(curPage)
 
    newIdx = clamp(idx + dp, 0, len(allPages) - 1)
 
    print('change from ', idx, newIdx)
 
    newPage = allPages[newIdx]
 
    setCurMappedPage(g, mapping, newPage)
 

	
 

	
 
def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
 
    log.info(f'setFader(fader={fader}, strength={strength:.03f}')
 
async def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
 
    log.info(f'setFader(fader={fader}, strength={strength:.03f})')
 
    valueLit = decimalLiteral(round(strength, 3))
 
    with graph.currentState() as g:
 
        fadeSet = g.value(fader, L9['setting'])
 
    if fadeSet is None:
 
        raise ValueError(f'fader {fader} has no :setting')
 
    graph.patchObject(ctx, fadeSet, L9['value'], valueLit)
 
    await asyncPatchObject(graph, ctx, fadeSet, L9['value'], valueLit)
 

	
 

	
 
def changeGrandMaster(graph: SyncedGraph, newValue: float):
 
def changeGrandMaster(graph: SyncedGraph, newValue: float, ctx: URIRef):
 
    graph.patchObject(ctx, L9.grandMaster, L9['value'], decimalLiteral(newValue))
 

	
 

	
 
def onMessage(graph: SyncedGraph, ctx: URIRef, m: Dict):
 
async def onMessage(graph: SyncedGraph, pages: Pages, ctx: URIRef, _lastSet: dict[int, int], m: dict):
 
    if m['type'] == 'active_sensing':
 
        return
 
    if m['type'] == 'control_change':
 
        if m['dev'] == 'bcf2000' and m['control'] == 91:
 
            changePage(graph, -1)
 
            pages.changePage(-1)
 
            return
 
        if m['dev'] == 'bcf2000' and m['control'] == 92:
 
            changePage(graph, 1)
 
            pages.changePage(1)
 
            return
 
        if m['dev'] == 'bcf2000' and m['control'] == 8:
 
            changeGrandMaster(graph, clamp(m['value'] / 127 * 1.5, 0, 1))
 
            changeGrandMaster(graph, clamp(m['value'] / 127 * 1.5, 0, 1), ctx)
 
            return
 

	
 
        try:
 
            fader = {
 
                'quneo': {
 
                    44: L9['show/dance2023/fadePage1f0'],
 
                    45: L9['show/dance2023/fadePage1f0'],
 
                    46: L9['show/dance2023/fadePage1f0'],
 
                },
 
                'bcf2000': currentFaders,
 
            }[m['dev']][m['control']]
 
            fader = pages.lookupFader(m['dev'], m['control'])
 
        except KeyError:
 
            log.info(f'unknown control {m}')
 
            return
 
        try:
 
            writeHwValueToGraph(graph, ctx, fader, m['value'] / 127)
 
            await writeHwValueToGraph(graph, ctx, fader, m['value'] / 127)
 
            _lastSet[m['control']] = m['value']
 
        except ValueError as e:
 
            log.warning(f'{e!r} - ignoring')
 
@@ -140,103 +72,26 @@ def onMessage(graph: SyncedGraph, ctx: U
 
        log.info(f'unhandled message {m}')
 

	
 

	
 
def reduceToLatestValue(ms: List[Dict]) -> List[Dict]:
 
    merge = {}
 
    for m in ms:
 
        normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value')))
 
        merge[normal_key] = m
 
    return merge.values()
 

	
 

	
 
class WriteBackFaders:
 

	
 
    def __init__(self, graph: SyncedGraph, bcf_out, getCurrentValue):
 
        self.graph = graph
 
        self.bcf_out = bcf_out
 
        self.getCurrentValue = getCurrentValue
 

	
 
    def update(self):
 
        try:
 
            self._update()
 
        except ValueError as e:
 
            log.warning(repr(e))
 

	
 
    def _update(self):
 
        g = self.graph
 
        nupdated = 0
 
        m = getChansToFaders(g)
 
        for midi_ctl_addr, f in m.items():
 
            fset = g.value(f, L9.setting)
 
            # could split this to a separate handler per fader
 
            value = g.value(fset, L9.value).toPython()
 
            hwcurrent = self.getCurrentValue(midi_ctl_addr)
 
            hwgoal = int(value * 127)
 
            print(f'{f} {hwcurrent=} {hwgoal=}')
 
            if abs(hwcurrent - hwgoal) > 2:
 
                self.sendToBcf(midi_ctl_addr, hwgoal)
 
                nupdated += 1
 
        log.info(f'wrote to {nupdated} of {len(m)} mapped faders')
 

	
 
    def sendToBcf(self, control, value):
 
        _lastSet[control] = value
 
        msg = mido.Message('control_change', control=control, value=value)
 
        self.bcf_out.send(msg)
 

	
 

	
 
async def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('graphedit').setLevel(logging.INFO)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "midifade")
 
    ctx = URIRef(showUri() + '/fade')
 

	
 
    msgs = asyncio.Queue()
 
    loop = asyncio.get_event_loop()
 

	
 
    def onMessageMidoThread(dev, message):
 
        loop.call_soon_threadsafe(msgs.put_nowait, message.dict() | {'dev': dev})
 
    mido.set_backend('alsa_midi.mido_backend')
 

	
 
    async def reader():
 
        while True:
 
            recents = [await msgs.get()]
 
            while not msgs.empty():
 
                recents.append(msgs.get_nowait())
 
            try:
 
                for msg in reduceToLatestValue(recents):
 
                    onMessage(graph, ctx, msg)
 
            except Exception as e:
 
                traceback.print_exc()
 
                log.warning("error in onMessage- continuing anyway")
 
            await asyncio.sleep(1 / MAX_SEND_RATE)
 
    MAX_SEND_RATE = 30
 

	
 
    _lastSet = {}  #midictlchannel:value7bit
 

	
 
    asyncio.create_task(reader())
 
    openPorts = []
 
    for inputName in mido.get_input_names():  # type: ignore
 
        if inputName.startswith('Keystation'):
 
            dev = "keystation"
 
        elif inputName.startswith('BCF2000'):
 
            dev = 'bcf2000'
 
        elif inputName.startswith('QUNEO'):
 
            dev = 'quneo'
 
        else:
 
            continue
 
        log.info(f'listening on input {inputName} {dev=}')
 
        openPorts.append(mido.open_input(  # type: ignore
 
            inputName,  #
 
            callback=lambda message, dev=dev: onMessageMidoThread(dev, message)))
 
    ctx = URIRef(showUri() + '/fade')
 
    graph = SyncedGraph(networking.rdfdb.url, "midifade")
 
    pages = Pages(graph, ctx)
 
    queue = EventQueue(MAX_SEND_RATE, partial(onMessage, graph, pages, ctx, _lastSet))
 
    listenToMidiInputs(queue)
 
    connectToMidiOutput(graph, pages, _lastSet)
 
    graph.addHandler(pages.compileCurrents)
 

	
 
    graph.addHandler(lambda: compileCurrents(graph))
 

	
 
    for outputName in mido.get_output_names():  # type: ignore
 
        if outputName.startswith('BCF2000'):
 
            bcf_out = mido.open_output(outputName)  # type: ignore
 
            wb = WriteBackFaders(graph, bcf_out, getCurrentValue=lambda f: _lastSet.get(f, 0))
 
            graph.addHandler(wb.update)
 
            break
 

	
 
    while True:
 
        await asyncio.sleep(1)
 
    await queue.run()
 

	
 

	
 
if __name__ == '__main__':
src/light9/midifade/pages.py
Show inline comments
 
new file 100644
 
import logging
 
from typing import cast
 

	
 
from light9.effect.edit import clamp
 
from light9.namespaces import L9
 
from rdfdb.syncedgraph.readonly_graph import ReadOnlyConjunctiveGraph
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import RDF, Literal, URIRef
 

	
 
log = logging.getLogger()
 

	
 

	
 
class Pages:
 
    """converts between fader numbers and FaderUri, which is a mapping that can
 
    be changed by moving between pages"""
 

	
 
    def __init__(self, graph: SyncedGraph, ctx: URIRef):
 
        self.graph = graph
 
        self.ctx = ctx
 

	
 
        self.currentFaders = {}  # midi control channel num : FaderUri
 

	
 
    def getChansToFaders(self) -> dict[int, URIRef]:
 
        fadePage = self.getCurMappedPage()
 
        ret = []
 
        for f in self.graph.objects(fadePage, L9.fader):
 
            columnLit = cast(Literal, self.graph.value(f, L9['column']))
 
            col = int(columnLit.toPython())
 
            ret.append((col, f))
 

	
 
        ret.sort()
 
        ctl_channels = list(range(81, 88 + 1))
 
        out = {}
 
        for chan, (col, f) in zip(ctl_channels, ret):
 
            out[chan] = f
 
        return out
 

	
 
    def changePage(self, dp: int):
 
        """dp==-1, make the previous page active, etc. Writes to graph"""
 

	
 
        with self.graph.currentState() as current:
 
            allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp))
 
            mapping = self.getGraphMappingNode(current)
 
            curPage = current.value(mapping, L9.outputs)
 
        if curPage is None:
 
            curPage = allPages[0]
 
        idx = allPages.index(curPage)
 
        newIdx = clamp(idx + dp, 0, len(allPages) - 1)
 
        log.info(f'change from {idx} {newIdx}')
 
        newPage = allPages[newIdx]
 
        self.setCurMappedPage(mapping, newPage)
 

	
 
    def getCurMappedPage(self) -> URIRef:
 
        mapping = self.getGraphMappingNode(self.graph)
 
        ret = self.graph.value(mapping, L9['outputs'])
 
        assert ret is not None
 
        return ret
 

	
 
    def setCurMappedPage(self, mapping: URIRef, newPage: URIRef):
 
        self.graph.patchObject(self.ctx, mapping, L9.outputs, newPage)
 

	
 
    def getGraphMappingNode(self, g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef:
 
        mapping = g.value(L9['midiControl'], L9['map'])
 
        if mapping is None:
 
            raise ValueError('no :midiControl :map ?mapping')
 
        midiDev = g.value(mapping, L9['midiDev'])
 
        ourDev = 'bcf2000'
 
        if midiDev != Literal(ourDev):
 
            raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
 
        return mapping
 

	
 
    def compileCurrents(self):
 
        self.currentFaders.clear()
 
        try:
 
            new = self.getChansToFaders()
 
        except ValueError:
 
            return  # e.g. empty-graph startup
 
        self.currentFaders.update(new)
 

	
 
    def lookupFader(self, dev: str, control: int) -> URIRef:
 
        return {
 
            'quneo': {
 
                44: L9['show/dance2023/fadePage1f0'],
 
                45: L9['show/dance2023/fadePage1f0'],
 
                46: L9['show/dance2023/fadePage1f0'],
 
            },
 
            'bcf2000': self.currentFaders,
 
        }[dev][control]
src/light9/midifade/writeback.py
Show inline comments
 
new file 100644
 
import asyncio
 
import logging
 
from dataclasses import dataclass
 
from typing import cast
 

	
 
import mido
 
from light9.midifade.pages import Pages
 
from light9.namespaces import L9
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import Literal
 
from debouncer import DebounceOptions, debounce
 

	
 
log = logging.getLogger()
 

	
 

	
 
@dataclass
 
class WriteBackFaders:
 
    graph: SyncedGraph
 
    pages: Pages
 
    bcf_out: mido.ports.BaseOutput
 
    _lastSet: dict[int, int]
 

	
 
    def getCurrentValue(self, f):
 
        return self._lastSet.get(f, 0)
 

	
 
    def update(self):
 
        try:
 
            asyncio.create_task(self._update())
 
        except ValueError as e:
 
            log.warning(repr(e))
 

	
 
    async def _update(self):
 
        # to make this work again:
 
        #  - track the goal of all sliders
 
        #  - in a debounced handler, sendToBcf
 
        return
 
        g = self.graph
 
        nupdated = 0
 
        m = self.pages.getChansToFaders()
 
        for midi_ctl_addr, f in m.items():
 
            fset = g.value(f, L9.setting)
 
            # could split this to a separate handler per fader
 
            value = cast(Literal, g.value(fset, L9.value)).toPython()
 
            hwcurrent = self.getCurrentValue(midi_ctl_addr)
 
            hwgoal = int(value * 127)
 
            maxValueDiff = 3
 
            # todo: 3 is no good; we should correct after a short time of no
 
            # movement
 
            if abs(hwcurrent - hwgoal) > maxValueDiff:
 
                log.info(f'writing back to {f} {hwcurrent=} {hwgoal=}')
 
                self.sendToBcf(midi_ctl_addr, hwgoal)
 
                nupdated += 1
 
        if nupdated > 0:
 
            log.info(f'wrote to {nupdated} of {len(m)} mapped faders')
 

	
 
    def sendToBcf(self, control: int, value: int):
 
        self._lastSet[control] = value
 
        msg = mido.Message('control_change', control=control, value=value)
 
        self.bcf_out.send(msg)
0 comments (0 inline, 0 general)