diff --git a/src/light9/midifade/eventqueue.py b/src/light9/midifade/eventqueue.py new file mode 100644 --- /dev/null +++ b/src/light9/midifade/eventqueue.py @@ -0,0 +1,49 @@ +import asyncio +import logging +import traceback + +log = logging.getLogger() + + +class EventQueue: + """midi events come in fast; graph consumes them slower""" + + def __init__(self, MAX_SEND_RATE: float, onMessage) -> None: + self.MAX_SEND_RATE = MAX_SEND_RATE + self.onMessage = onMessage + self.msgs = asyncio.Queue() + + def callbackFor(self, dev): + mainThreadLoop = asyncio.get_running_loop() + + def cb(message): + # this is running in mido's thread + log.info(f'enqueue {message} {"*" * message.dict()["value"]}') + mainThreadLoop.call_soon_threadsafe( + self.msgs.put_nowait, + message.dict() | {'dev': dev}, + ) + + return cb + + async def run(self): + while True: + recents = [await self.msgs.get()] + while not self.msgs.empty(): + recents.append(self.msgs.get_nowait()) + try: + for msg in reduceToLatestValue(recents): + # log.info(f'handle {msg=}') + await self.onMessage(msg) + except Exception: + traceback.print_exc() + log.warning("error in onMessage- continuing anyway") + await asyncio.sleep(1 / self.MAX_SEND_RATE) + + +def reduceToLatestValue(ms: list[dict]) -> list[dict]: + merge = {} + for m in ms: + normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value'))) + merge[normal_key] = m + return list(merge.values()) diff --git a/src/light9/midifade/mididevs.py b/src/light9/midifade/mididevs.py new file mode 100644 --- /dev/null +++ b/src/light9/midifade/mididevs.py @@ -0,0 +1,40 @@ +import logging + +import mido +from light9.midifade.eventqueue import EventQueue +from light9.midifade.pages import Pages +from light9.midifade.writeback import WriteBackFaders +from rdfdb.syncedgraph.syncedgraph import SyncedGraph + +log = logging.getLogger() + +_csb=[] +openPorts = [] +def listenToMidiInputs(q: EventQueue): + """put midi events on EventQueue (presumably mido does this in a bg + thread)""" + + for inputName in mido.get_input_names(): # type: ignore + if inputName.startswith('Keystation'): + dev = "keystation" + elif inputName.startswith('BCF2000'): + dev = 'bcf2000' + elif inputName.startswith('QUNEO'): + dev = 'quneo' + else: + continue + log.info(f'listening on input {inputName} {dev=}') + cb = q.callbackFor(dev) + _csb.append(cb) + openPorts.append(mido.open_input( # type: ignore + inputName, # + callback=cb)) + + +def connectToMidiOutput(graph: SyncedGraph, pages: Pages, _lastSet: dict[int, int]): + for outputName in mido.get_output_names(): # type: ignore + if outputName.startswith('BCF2000'): + bcf_out = mido.open_output(outputName) # type: ignore + wb = WriteBackFaders(graph, pages, bcf_out, _lastSet) + graph.addHandler(wb.update) + break diff --git a/src/light9/midifade/midifade.py b/src/light9/midifade/midifade.py --- a/src/light9/midifade/midifade.py +++ b/src/light9/midifade/midifade.py @@ -1,4 +1,3 @@ -#!bin/python """ Read midi events, write fade levels to graph @@ -6,133 +5,66 @@ Device troubleshooting: amidi -l """ import asyncio +from functools import partial import logging -import traceback -from typing import Dict, List, cast -from light9.effect.edit import clamp import mido -from rdfdb.syncedgraph.syncedgraph import SyncedGraph -from rdflib import RDF, ConjunctiveGraph, Literal, URIRef -from rdfdb.syncedgraph.readonly_graph import ReadOnlyConjunctiveGraph from light9 import networking +from light9.effect.edit import clamp +from light9.midifade.eventqueue import EventQueue +from light9.midifade.mididevs import connectToMidiOutput, listenToMidiInputs +from light9.midifade.pages import Pages from light9.namespaces import L9 from light9.newtypes import decimalLiteral from light9.run_local import log from light9.showconfig import showUri - -mido.set_backend('alsa_midi.mido_backend') -MAX_SEND_RATE = 30 - -_lastSet = {} #midictlchannel:value7bit - -currentFaders = {} # midi control channel num : FaderUri -ctx = URIRef(showUri() + '/fade') - - -def compileCurrents(graph): - currentFaders.clear() - try: - new = getChansToFaders(graph) - except ValueError: - return # e.g. empty-graph startup - currentFaders.update(new) - - -def getGraphMappingNode(g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef: - mapping = g.value(L9['midiControl'], L9['map']) - if mapping is None: - raise ValueError('no :midiControl :map ?mapping') - midiDev = g.value(mapping, L9['midiDev']) - ourDev = 'bcf2000' - if midiDev != Literal(ourDev): - raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}') - return mapping - - -def getCurMappedPage(g: SyncedGraph): - mapping = getGraphMappingNode(g) - return g.value(mapping, L9['outputs']) +from rdfdb.syncedgraph.syncedgraph import SyncedGraph +from rdflib import URIRef -def setCurMappedPage(g: SyncedGraph, mapping: URIRef, newPage: URIRef): - g.patchObject(ctx, mapping, L9.outputs, newPage) +# inline of patchObject to make it async, not just create_task +async def asyncPatchObject(graph, context: URIRef, subject, predicate: URIRef, newObject): + p = graph.getObjectPatch(context, subject, predicate, newObject) + # if not p.isEmpty(): + # log.debug("patchObject %r" % p.jsonRepr) + await graph.patch(p) -def getChansToFaders(g: SyncedGraph) -> Dict[int, URIRef]: - fadePage = getCurMappedPage(g) - ret = [] - for f in g.objects(fadePage, L9.fader): - columnLit = cast(Literal, g.value(f, L9['column'])) - col = int(columnLit.toPython()) - ret.append((col, f)) - - ret.sort() - ctl_channels = list(range(81, 88 + 1)) - out = {} - for chan, (col, f) in zip(ctl_channels, ret): - out[chan] = f - return out - - -def changePage(g: SyncedGraph, dp: int): - """dp==-1, make the previous page active, etc. Writes to graph""" - - with g.currentState() as current: - allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp)) - mapping = getGraphMappingNode(current) - curPage = current.value(mapping, L9.outputs) - if curPage is None: - curPage = allPages[0] - idx = allPages.index(curPage) - newIdx = clamp(idx + dp, 0, len(allPages) - 1) - print('change from ', idx, newIdx) - newPage = allPages[newIdx] - setCurMappedPage(g, mapping, newPage) - - -def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float): - log.info(f'setFader(fader={fader}, strength={strength:.03f}') +async def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float): + log.info(f'setFader(fader={fader}, strength={strength:.03f})') valueLit = decimalLiteral(round(strength, 3)) with graph.currentState() as g: fadeSet = g.value(fader, L9['setting']) if fadeSet is None: raise ValueError(f'fader {fader} has no :setting') - graph.patchObject(ctx, fadeSet, L9['value'], valueLit) + await asyncPatchObject(graph, ctx, fadeSet, L9['value'], valueLit) -def changeGrandMaster(graph: SyncedGraph, newValue: float): +def changeGrandMaster(graph: SyncedGraph, newValue: float, ctx: URIRef): graph.patchObject(ctx, L9.grandMaster, L9['value'], decimalLiteral(newValue)) -def onMessage(graph: SyncedGraph, ctx: URIRef, m: Dict): +async def onMessage(graph: SyncedGraph, pages: Pages, ctx: URIRef, _lastSet: dict[int, int], m: dict): if m['type'] == 'active_sensing': return if m['type'] == 'control_change': if m['dev'] == 'bcf2000' and m['control'] == 91: - changePage(graph, -1) + pages.changePage(-1) return if m['dev'] == 'bcf2000' and m['control'] == 92: - changePage(graph, 1) + pages.changePage(1) return if m['dev'] == 'bcf2000' and m['control'] == 8: - changeGrandMaster(graph, clamp(m['value'] / 127 * 1.5, 0, 1)) + changeGrandMaster(graph, clamp(m['value'] / 127 * 1.5, 0, 1), ctx) return try: - fader = { - 'quneo': { - 44: L9['show/dance2023/fadePage1f0'], - 45: L9['show/dance2023/fadePage1f0'], - 46: L9['show/dance2023/fadePage1f0'], - }, - 'bcf2000': currentFaders, - }[m['dev']][m['control']] + fader = pages.lookupFader(m['dev'], m['control']) except KeyError: log.info(f'unknown control {m}') return try: - writeHwValueToGraph(graph, ctx, fader, m['value'] / 127) + await writeHwValueToGraph(graph, ctx, fader, m['value'] / 127) _lastSet[m['control']] = m['value'] except ValueError as e: log.warning(f'{e!r} - ignoring') @@ -140,103 +72,26 @@ def onMessage(graph: SyncedGraph, ctx: U log.info(f'unhandled message {m}') -def reduceToLatestValue(ms: List[Dict]) -> List[Dict]: - merge = {} - for m in ms: - normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value'))) - merge[normal_key] = m - return merge.values() - - -class WriteBackFaders: - - def __init__(self, graph: SyncedGraph, bcf_out, getCurrentValue): - self.graph = graph - self.bcf_out = bcf_out - self.getCurrentValue = getCurrentValue - - def update(self): - try: - self._update() - except ValueError as e: - log.warning(repr(e)) - - def _update(self): - g = self.graph - nupdated = 0 - m = getChansToFaders(g) - for midi_ctl_addr, f in m.items(): - fset = g.value(f, L9.setting) - # could split this to a separate handler per fader - value = g.value(fset, L9.value).toPython() - hwcurrent = self.getCurrentValue(midi_ctl_addr) - hwgoal = int(value * 127) - print(f'{f} {hwcurrent=} {hwgoal=}') - if abs(hwcurrent - hwgoal) > 2: - self.sendToBcf(midi_ctl_addr, hwgoal) - nupdated += 1 - log.info(f'wrote to {nupdated} of {len(m)} mapped faders') - - def sendToBcf(self, control, value): - _lastSet[control] = value - msg = mido.Message('control_change', control=control, value=value) - self.bcf_out.send(msg) - - async def main(): logging.getLogger('autodepgraphapi').setLevel(logging.INFO) logging.getLogger('syncedgraph').setLevel(logging.INFO) logging.getLogger('graphedit').setLevel(logging.INFO) - graph = SyncedGraph(networking.rdfdb.url, "midifade") - ctx = URIRef(showUri() + '/fade') - - msgs = asyncio.Queue() - loop = asyncio.get_event_loop() - - def onMessageMidoThread(dev, message): - loop.call_soon_threadsafe(msgs.put_nowait, message.dict() | {'dev': dev}) + mido.set_backend('alsa_midi.mido_backend') - async def reader(): - while True: - recents = [await msgs.get()] - while not msgs.empty(): - recents.append(msgs.get_nowait()) - try: - for msg in reduceToLatestValue(recents): - onMessage(graph, ctx, msg) - except Exception as e: - traceback.print_exc() - log.warning("error in onMessage- continuing anyway") - await asyncio.sleep(1 / MAX_SEND_RATE) + MAX_SEND_RATE = 30 + + _lastSet = {} #midictlchannel:value7bit - asyncio.create_task(reader()) - openPorts = [] - for inputName in mido.get_input_names(): # type: ignore - if inputName.startswith('Keystation'): - dev = "keystation" - elif inputName.startswith('BCF2000'): - dev = 'bcf2000' - elif inputName.startswith('QUNEO'): - dev = 'quneo' - else: - continue - log.info(f'listening on input {inputName} {dev=}') - openPorts.append(mido.open_input( # type: ignore - inputName, # - callback=lambda message, dev=dev: onMessageMidoThread(dev, message))) + ctx = URIRef(showUri() + '/fade') + graph = SyncedGraph(networking.rdfdb.url, "midifade") + pages = Pages(graph, ctx) + queue = EventQueue(MAX_SEND_RATE, partial(onMessage, graph, pages, ctx, _lastSet)) + listenToMidiInputs(queue) + connectToMidiOutput(graph, pages, _lastSet) + graph.addHandler(pages.compileCurrents) - graph.addHandler(lambda: compileCurrents(graph)) - - for outputName in mido.get_output_names(): # type: ignore - if outputName.startswith('BCF2000'): - bcf_out = mido.open_output(outputName) # type: ignore - wb = WriteBackFaders(graph, bcf_out, getCurrentValue=lambda f: _lastSet.get(f, 0)) - graph.addHandler(wb.update) - break - - while True: - await asyncio.sleep(1) + await queue.run() if __name__ == '__main__': diff --git a/src/light9/midifade/pages.py b/src/light9/midifade/pages.py new file mode 100644 --- /dev/null +++ b/src/light9/midifade/pages.py @@ -0,0 +1,88 @@ +import logging +from typing import cast + +from light9.effect.edit import clamp +from light9.namespaces import L9 +from rdfdb.syncedgraph.readonly_graph import ReadOnlyConjunctiveGraph +from rdfdb.syncedgraph.syncedgraph import SyncedGraph +from rdflib import RDF, Literal, URIRef + +log = logging.getLogger() + + +class Pages: + """converts between fader numbers and FaderUri, which is a mapping that can + be changed by moving between pages""" + + def __init__(self, graph: SyncedGraph, ctx: URIRef): + self.graph = graph + self.ctx = ctx + + self.currentFaders = {} # midi control channel num : FaderUri + + def getChansToFaders(self) -> dict[int, URIRef]: + fadePage = self.getCurMappedPage() + ret = [] + for f in self.graph.objects(fadePage, L9.fader): + columnLit = cast(Literal, self.graph.value(f, L9['column'])) + col = int(columnLit.toPython()) + ret.append((col, f)) + + ret.sort() + ctl_channels = list(range(81, 88 + 1)) + out = {} + for chan, (col, f) in zip(ctl_channels, ret): + out[chan] = f + return out + + def changePage(self, dp: int): + """dp==-1, make the previous page active, etc. Writes to graph""" + + with self.graph.currentState() as current: + allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp)) + mapping = self.getGraphMappingNode(current) + curPage = current.value(mapping, L9.outputs) + if curPage is None: + curPage = allPages[0] + idx = allPages.index(curPage) + newIdx = clamp(idx + dp, 0, len(allPages) - 1) + log.info(f'change from {idx} {newIdx}') + newPage = allPages[newIdx] + self.setCurMappedPage(mapping, newPage) + + def getCurMappedPage(self) -> URIRef: + mapping = self.getGraphMappingNode(self.graph) + ret = self.graph.value(mapping, L9['outputs']) + assert ret is not None + return ret + + def setCurMappedPage(self, mapping: URIRef, newPage: URIRef): + self.graph.patchObject(self.ctx, mapping, L9.outputs, newPage) + + def getGraphMappingNode(self, g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef: + mapping = g.value(L9['midiControl'], L9['map']) + if mapping is None: + raise ValueError('no :midiControl :map ?mapping') + midiDev = g.value(mapping, L9['midiDev']) + ourDev = 'bcf2000' + if midiDev != Literal(ourDev): + raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}') + return mapping + + def compileCurrents(self): + self.currentFaders.clear() + try: + new = self.getChansToFaders() + except ValueError: + return # e.g. empty-graph startup + self.currentFaders.update(new) + + def lookupFader(self, dev: str, control: int) -> URIRef: + return { + 'quneo': { + 44: L9['show/dance2023/fadePage1f0'], + 45: L9['show/dance2023/fadePage1f0'], + 46: L9['show/dance2023/fadePage1f0'], + }, + 'bcf2000': self.currentFaders, + }[dev][control] diff --git a/src/light9/midifade/writeback.py b/src/light9/midifade/writeback.py new file mode 100644 --- /dev/null +++ b/src/light9/midifade/writeback.py @@ -0,0 +1,59 @@ +import asyncio +import logging +from dataclasses import dataclass +from typing import cast + +import mido +from light9.midifade.pages import Pages +from light9.namespaces import L9 +from rdfdb.syncedgraph.syncedgraph import SyncedGraph +from rdflib import Literal +from debouncer import DebounceOptions, debounce + +log = logging.getLogger() + + +@dataclass +class WriteBackFaders: + graph: SyncedGraph + pages: Pages + bcf_out: mido.ports.BaseOutput + _lastSet: dict[int, int] + + def getCurrentValue(self, f): + return self._lastSet.get(f, 0) + + def update(self): + try: + asyncio.create_task(self._update()) + except ValueError as e: + log.warning(repr(e)) + + async def _update(self): + # to make this work again: + # - track the goal of all sliders + # - in a debounced handler, sendToBcf + return + g = self.graph + nupdated = 0 + m = self.pages.getChansToFaders() + for midi_ctl_addr, f in m.items(): + fset = g.value(f, L9.setting) + # could split this to a separate handler per fader + value = cast(Literal, g.value(fset, L9.value)).toPython() + hwcurrent = self.getCurrentValue(midi_ctl_addr) + hwgoal = int(value * 127) + maxValueDiff = 3 + # todo: 3 is no good; we should correct after a short time of no + # movement + if abs(hwcurrent - hwgoal) > maxValueDiff: + log.info(f'writing back to {f} {hwcurrent=} {hwgoal=}') + self.sendToBcf(midi_ctl_addr, hwgoal) + nupdated += 1 + if nupdated > 0: + log.info(f'wrote to {nupdated} of {len(m)} mapped faders') + + def sendToBcf(self, control: int, value: int): + self._lastSet[control] = value + msg = mido.Message('control_change', control=control, value=value) + self.bcf_out.send(msg)