Files
@ 4f610ef6768d
Branch filter:
Location: light9/light9/midifade/midifade.py
4f610ef6768d
5.7 KiB
text/x-python
write graph values of current fader page back out over midi
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | #!bin/python
"""
Read midi events, write fade levels to graph
"""
import asyncio
import logging
import traceback
from typing import Dict, List
import mido
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef
from light9 import networking
from light9.namespaces import L9
from light9.newtypes import decimalLiteral
from light9.run_local import log
from light9.showconfig import showUri
mido.set_backend('alsa_midi.mido_backend')
MAX_SEND_RATE = 20
_lastSet = {} #Fader:value
def setFader(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
log.info(f'setFader(fader={fader}, strength={strength:.03f}')
valueLit = decimalLiteral(round(strength, 3))
with graph.currentState() as g:
fadeSet = g.value(fader, L9['setting'])
if fadeSet is None:
raise ValueError(f'fader {fader} has no :setting')
graph.patchObject(ctx, fadeSet, L9['value'], valueLit)
_lastSet[fader] = strength
def onMessage(graph: SyncedGraph, ctx: URIRef, m: Dict):
if m['type'] == 'active_sensing':
return
if m['type'] == 'control_change':
try:
fader = {
'quneo': {
44: L9['show/dance2023/fadePage1f0'],
45: L9['show/dance2023/fadePage1f0'],
46: L9['show/dance2023/fadePage1f0'],
},
'bcf2000': {
81: L9['show/dance2023/fader0'],
82: L9['show/dance2023/fader1'],
83: L9['show/dance2023/fader2'],
84: L9['show/dance2023/fader3'],
85: L9['show/dance2023/fader4'],
86: L9['show/dance2023/fader5'],
87: L9['show/dance2023/fader6'],
88: L9['show/dance2023/fader7'],
}
}[m['dev']][m['control']]
except KeyError:
log.info(f'unknown control {m}')
return
try:
setFader(graph, ctx, fader, m['value'] / 127)
except ValueError as e:
log.warning(f'{e!r} - ignoring')
else:
log.info(f'unhandled message {m}')
def reduceToLatestValue(ms: List[Dict]) -> List[Dict]:
merge = {}
for m in ms:
normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value')))
merge[normal_key] = m
return merge.values()
class WriteBackFaders:
def __init__(self, graph: SyncedGraph, bcf_out, getCurrentValue):
self.graph = graph
self.bcf_out = bcf_out
self.getCurrentValue = getCurrentValue
def update(self):
try:
self._update()
except ValueError as e:
log.warning(repr(e))
def _update(self):
g = self.graph
mapping = g.value(L9['midiControl'], L9['map'])
if mapping is None:
raise ValueError('no :midiControl :map ?mapping')
midiDev = g.value(mapping, L9['midiDev'])
ourDev = 'bcf2000'
if midiDev != Literal(ourDev):
raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
fadePage = g.value(mapping, L9['outputs'])
nupdated=0
faders = list(g.objects(fadePage, L9.fader))
for f in faders:
column = int(g.value(f, L9.column).toPython())
fset = g.value(f, L9.setting)
# could split this to a separate handler per fader
value = g.value(fset, L9.value).toPython()
hwcurrent = int(self.getCurrentValue(f) * 127)
hwgoal = int(value * 127)
if abs(hwcurrent - hwgoal) > 5:
midi_ctl_addr = column + 80
self.sendToBcf(midi_ctl_addr, hwgoal)
nupdated+=1
log.info(f'updated {nupdated} of {len(faders)} connected faders')
def sendToBcf(self, control, value):
msg = mido.Message('control_change', control=control, value=value)
self.bcf_out.send(msg)
async def main():
logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
logging.getLogger('syncedgraph').setLevel(logging.INFO)
logging.getLogger('graphedit').setLevel(logging.INFO)
graph = SyncedGraph(networking.rdfdb.url, "midifade")
ctx = URIRef(showUri() + '/fade')
msgs = asyncio.Queue()
loop = asyncio.get_event_loop()
def onMessageMidoThread(dev, message):
loop.call_soon_threadsafe(msgs.put_nowait, message.dict() | {'dev': dev})
async def reader():
while True:
recents = [await msgs.get()]
while not msgs.empty():
recents.append(msgs.get_nowait())
try:
for msg in reduceToLatestValue(recents):
onMessage(graph, ctx, msg)
except Exception as e:
traceback.print_exc()
log.warning("error in onMessage- continuing anyway")
await asyncio.sleep(1 / MAX_SEND_RATE)
asyncio.create_task(reader())
openPorts = []
for inputName in mido.get_input_names():
if inputName.startswith('Keystation'):
dev = "keystation"
elif inputName.startswith('BCF2000'):
dev = 'bcf2000'
elif inputName.startswith('QUNEO'):
dev = 'quneo'
else:
continue
log.info(f'listening on input {inputName} {dev=}')
openPorts.append(mido.open_input( #
inputName, #
callback=lambda message, dev=dev: onMessageMidoThread(dev, message)))
bcf_out = mido.open_output('BCF2000:BCF2000 MIDI 1 28:0')
wb = WriteBackFaders(graph, bcf_out, getCurrentValue=lambda f: _lastSet.get(f, 0))
graph.addHandler(wb.update)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
|