Files @ d1f86109e3cc
Branch filter:

Location: light9/src/light9/midifade/midifade.py

drewp@bigasterisk.com
more *value getter variants
"""
Read midi events, write fade levels to graph

Device troubleshooting:
    amidi -l
"""
import asyncio
import logging
from functools import partial

import mido
from light9 import networking
from light9.effect.edit import clamp
from light9.midifade.eventqueue import Event, EventQueue
from light9.midifade.mididevs import connectToMidiOutput, listenToMidiInputs
from light9.midifade.pages import Pages
from light9.namespaces import L9
from light9.newtypes import decimalLiteral
from light9.run_local import log
from light9.showconfig import showUri
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef


# inline of patchObject to make it async, not just create_task
async def asyncPatchObject(graph, context: URIRef, subject, predicate: URIRef, newObject):
    p = graph.getObjectPatch(context, subject, predicate, newObject)
    # if not p.isEmpty():
    #     log.debug("patchObject %r" % p.jsonRepr)
    await graph.patch(p)


async def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
    log.info(f'setFader(fader={fader}, strength={strength:.03f})')
    valueLit = decimalLiteral(round(strength, 3))
    with graph.currentState() as g:
        fadeSet = g.value(fader, L9['setting'])
    if fadeSet is None:
        raise ValueError(f'fader {fader} has no :setting')
    await asyncPatchObject(graph, ctx, fadeSet, L9['value'], valueLit)


def changeGrandMaster(graph: SyncedGraph, newValue: float, ctx: URIRef):
    graph.patchObject(ctx, L9.grandMaster, L9['value'], decimalLiteral(newValue))


async def onMessage(graph: SyncedGraph, pages: Pages, ctx: URIRef, _lastSet: dict[int, int], m: Event):
    if m.type == 'active_sensing':
        return
    if m.type == 'control_change':
        if m.dev == 'bcf2000' and m.control == 91:
            pages.changePage(-1)
            return
        if m.dev == 'bcf2000' and m.control == 92:
            pages.changePage(1)
            return
        if m.dev == 'bcf2000' and m.control == 8:
            changeGrandMaster(graph, clamp(m.value / 127 * 1.5, 0, 1), ctx)
            return

        try:
            fader = pages.lookupFader(m.dev, m.control)
        except KeyError:
            log.info(f'unknown control {m}')
            return
        try:
            await writeHwValueToGraph(graph, ctx, fader, m.value / 127)
            _lastSet[m.control] = m.value
        except ValueError as e:
            log.warning(f'{e!r} - ignoring')
    else:
        log.info(f'unhandled message {m}')


async def main():
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
    logging.getLogger('graphedit').setLevel(logging.INFO)

    mido.set_backend('alsa_midi.mido_backend')

    MAX_SEND_RATE = 50

    _lastSet = {}  #midictlchannel:value7bit

    ctx = URIRef(showUri() + '/fade')
    graph = SyncedGraph(networking.rdfdb.url, "midifade")
    pages = Pages(graph, ctx)
    queue = EventQueue(MAX_SEND_RATE, partial(onMessage, graph, pages, ctx, _lastSet))
    listenToMidiInputs(queue)
    connectToMidiOutput(graph, pages, _lastSet)
    graph.addHandler(pages.compileCurrents)
    # todo: serve fps metrics, at least
    await queue.run()


if __name__ == '__main__':
    asyncio.run(main())