Files
@ 4f610ef6768d
Branch filter:
Location: light9/light9/midifade/midifade.py
4f610ef6768d
5.7 KiB
text/x-python
write graph values of current fader page back out over midi
| #!bin/python
"""
Read midi events, write fade levels to graph
"""
import asyncio
import logging
import traceback
from typing import Dict, List
import mido
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef
from light9 import networking
from light9.namespaces import L9
from light9.newtypes import decimalLiteral
from light9.run_local import log
from light9.showconfig import showUri
mido.set_backend('alsa_midi.mido_backend')
MAX_SEND_RATE = 20
_lastSet = {} #Fader:value
def setFader(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
log.info(f'setFader(fader={fader}, strength={strength:.03f}')
valueLit = decimalLiteral(round(strength, 3))
with graph.currentState() as g:
fadeSet = g.value(fader, L9['setting'])
if fadeSet is None:
raise ValueError(f'fader {fader} has no :setting')
graph.patchObject(ctx, fadeSet, L9['value'], valueLit)
_lastSet[fader] = strength
def onMessage(graph: SyncedGraph, ctx: URIRef, m: Dict):
if m['type'] == 'active_sensing':
return
if m['type'] == 'control_change':
try:
fader = {
'quneo': {
44: L9['show/dance2023/fadePage1f0'],
45: L9['show/dance2023/fadePage1f0'],
46: L9['show/dance2023/fadePage1f0'],
},
'bcf2000': {
81: L9['show/dance2023/fader0'],
82: L9['show/dance2023/fader1'],
83: L9['show/dance2023/fader2'],
84: L9['show/dance2023/fader3'],
85: L9['show/dance2023/fader4'],
86: L9['show/dance2023/fader5'],
87: L9['show/dance2023/fader6'],
88: L9['show/dance2023/fader7'],
}
}[m['dev']][m['control']]
except KeyError:
log.info(f'unknown control {m}')
return
try:
setFader(graph, ctx, fader, m['value'] / 127)
except ValueError as e:
log.warning(f'{e!r} - ignoring')
else:
log.info(f'unhandled message {m}')
def reduceToLatestValue(ms: List[Dict]) -> List[Dict]:
merge = {}
for m in ms:
normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value')))
merge[normal_key] = m
return merge.values()
class WriteBackFaders:
def __init__(self, graph: SyncedGraph, bcf_out, getCurrentValue):
self.graph = graph
self.bcf_out = bcf_out
self.getCurrentValue = getCurrentValue
def update(self):
try:
self._update()
except ValueError as e:
log.warning(repr(e))
def _update(self):
g = self.graph
mapping = g.value(L9['midiControl'], L9['map'])
if mapping is None:
raise ValueError('no :midiControl :map ?mapping')
midiDev = g.value(mapping, L9['midiDev'])
ourDev = 'bcf2000'
if midiDev != Literal(ourDev):
raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
fadePage = g.value(mapping, L9['outputs'])
nupdated=0
faders = list(g.objects(fadePage, L9.fader))
for f in faders:
column = int(g.value(f, L9.column).toPython())
fset = g.value(f, L9.setting)
# could split this to a separate handler per fader
value = g.value(fset, L9.value).toPython()
hwcurrent = int(self.getCurrentValue(f) * 127)
hwgoal = int(value * 127)
if abs(hwcurrent - hwgoal) > 5:
midi_ctl_addr = column + 80
self.sendToBcf(midi_ctl_addr, hwgoal)
nupdated+=1
log.info(f'updated {nupdated} of {len(faders)} connected faders')
def sendToBcf(self, control, value):
msg = mido.Message('control_change', control=control, value=value)
self.bcf_out.send(msg)
async def main():
logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
logging.getLogger('syncedgraph').setLevel(logging.INFO)
logging.getLogger('graphedit').setLevel(logging.INFO)
graph = SyncedGraph(networking.rdfdb.url, "midifade")
ctx = URIRef(showUri() + '/fade')
msgs = asyncio.Queue()
loop = asyncio.get_event_loop()
def onMessageMidoThread(dev, message):
loop.call_soon_threadsafe(msgs.put_nowait, message.dict() | {'dev': dev})
async def reader():
while True:
recents = [await msgs.get()]
while not msgs.empty():
recents.append(msgs.get_nowait())
try:
for msg in reduceToLatestValue(recents):
onMessage(graph, ctx, msg)
except Exception as e:
traceback.print_exc()
log.warning("error in onMessage- continuing anyway")
await asyncio.sleep(1 / MAX_SEND_RATE)
asyncio.create_task(reader())
openPorts = []
for inputName in mido.get_input_names():
if inputName.startswith('Keystation'):
dev = "keystation"
elif inputName.startswith('BCF2000'):
dev = 'bcf2000'
elif inputName.startswith('QUNEO'):
dev = 'quneo'
else:
continue
log.info(f'listening on input {inputName} {dev=}')
openPorts.append(mido.open_input( #
inputName, #
callback=lambda message, dev=dev: onMessageMidoThread(dev, message)))
bcf_out = mido.open_output('BCF2000:BCF2000 MIDI 1 28:0')
wb = WriteBackFaders(graph, bcf_out, getCurrentValue=lambda f: _lastSet.get(f, 0))
graph.addHandler(wb.update)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
|