Files @ 63f82ec045bf
Branch filter:

Location: light9/light9/midifade/midifade.py

drewp@bigasterisk.com
don't stop midi input on a handler error
#!bin/python
"""
Read midi events, write fade levels to graph
"""
import asyncio
import logging
import traceback

import mido
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9 import networking
from light9.namespaces import L9
from light9.newtypes import decimalLiteral
from light9.run_local import log
from light9.showconfig import showUri

mido.set_backend('alsa_midi.mido_backend')
MAX_SEND_RATE = 20


def setFader(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
    log.info(f'setFader(fader={fader}, strength={strength:.03f}')
    valueLit = decimalLiteral(round(strength, 3))
    with graph.currentState() as g:
        fadeSet = g.value(fader, L9['setting'])
    graph.patchObject(ctx, fadeSet, L9['value'], valueLit)


def onMessage(graph: SyncedGraph, ctx: URIRef, m: mido.Message):
    md = m.dict()
    if md['type'] == 'active_sensing':
        return
    if m.is_cc():
        if md['control'] == 17:
            setFader(graph, ctx, L9['show/dance2023/fadePage1f3'], md['value'] / 127)
    else:
        log.info(f'unhandled message {md}')


async def main():
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
    logging.getLogger('graphedit').setLevel(logging.INFO)

    graph = SyncedGraph(networking.rdfdb.url, "midifade")
    ctx = URIRef(showUri() + '/fade')

    msgs = asyncio.Queue()
    loop = asyncio.get_event_loop()

    def onMessageMidoThread(message):
        loop.call_soon_threadsafe(msgs.put_nowait, message)

    async def reader():
        while True:
            recents = [await msgs.get()]
            while not msgs.empty():
                recents.append(msgs.get_nowait())
            try:
                onMessage(graph, ctx, recents[-1])
            except Exception as e:
                traceback.print_exc()
                log.warning("error in onMessage- continuing anyway")
            await asyncio.sleep(1 / MAX_SEND_RATE)

    asyncio.create_task(reader())

    port = mido.open_input('Keystation:Keystation MIDI 1 20:0', callback=onMessageMidoThread)

    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())