"""
Read midi events, write fade levels to graph
Device troubleshooting:
amidi -l
"""
import asyncio
import logging
from functools import partial
import mido
from light9 import networking
from light9.effect.edit import clamp
from light9.midifade.eventqueue import Event, EventQueue
from light9.midifade.mididevs import connectToMidiOutput, listenToMidiInputs
from light9.midifade.pages import Pages
from light9.namespaces import L9
from light9.newtypes import decimalLiteral
from light9.run_local import log
from light9.showconfig import showUri
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef
# inline of patchObject to make it async, not just create_task
async def asyncPatchObject(graph, context: URIRef, subject, predicate: URIRef, newObject):
p = graph.getObjectPatch(context, subject, predicate, newObject)
# if not p.isEmpty():
# log.debug("patchObject %r" % p.jsonRepr)
await graph.patch(p)
async def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
log.info(f'setFader(fader={fader}, strength={strength:.03f})')
valueLit = decimalLiteral(round(strength, 3))
with graph.currentState() as g:
fadeSet = g.value(fader, L9['setting'])
if fadeSet is None:
raise ValueError(f'fader {fader} has no :setting')
await asyncPatchObject(graph, ctx, fadeSet, L9['value'], valueLit)
def changeGrandMaster(graph: SyncedGraph, newValue: float, ctx: URIRef):
graph.patchObject(ctx, L9.grandMaster, L9['value'], decimalLiteral(newValue))
async def onMessage(graph: SyncedGraph, pages: Pages, ctx: URIRef, _lastSet: dict[int, int], m: Event):
if m.type == 'active_sensing':
return
if m.type == 'control_change':
if m.dev == 'bcf2000' and m.control == 91:
pages.changePage(-1)
return
if m.dev == 'bcf2000' and m.control == 92:
pages.changePage(1)
return
if m.dev == 'bcf2000' and m.control == 8:
changeGrandMaster(graph, clamp(m.value / 127 * 1.5, 0, 1), ctx)
return
try:
fader = pages.lookupFader(m.dev, m.control)
except KeyError:
log.info(f'unknown control {m}')
return
try:
await writeHwValueToGraph(graph, ctx, fader, m.value / 127)
_lastSet[m.control] = m.value
except ValueError as e:
log.warning(f'{e!r} - ignoring')
else:
log.info(f'unhandled message {m}')
async def main():
logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
logging.getLogger('syncedgraph').setLevel(logging.INFO)
logging.getLogger('graphedit').setLevel(logging.INFO)
mido.set_backend('alsa_midi.mido_backend')
MAX_SEND_RATE = 50
_lastSet = {} #midictlchannel:value7bit
ctx = URIRef(showUri() + '/fade')
graph = SyncedGraph(networking.rdfdb.url, "midifade")
pages = Pages(graph, ctx)
queue = EventQueue(MAX_SEND_RATE, partial(onMessage, graph, pages, ctx, _lastSet))
listenToMidiInputs(queue)
connectToMidiOutput(graph, pages, _lastSet)
graph.addHandler(pages.compileCurrents)
# todo: serve fps metrics, at least
await queue.run()
if __name__ == '__main__':
asyncio.run(main())