Mercurial > code > home > repos > light9
view src/light9/midifade/midifade.py @ 2390:485148ef5686
reformat
author | drewp@bigasterisk.com |
---|---|
date | Tue, 14 May 2024 12:17:55 -0700 |
parents | 0e90dd50e8c4 |
children | bbff83207963 |
line wrap: on
line source
""" Read midi events, write fade levels to graph Device troubleshooting: amidi -l """ import asyncio import logging from functools import partial import mido from light9 import networking from light9.effect.edit import clamp from light9.midifade.eventqueue import Event, EventQueue from light9.midifade.mididevs import connectToMidiOutput, listenToMidiInputs from light9.midifade.pages import Pages from light9.namespaces import L9 from light9.newtypes import decimalLiteral from light9.run_local import log from light9.showconfig import showUri from rdfdb.syncedgraph.syncedgraph import SyncedGraph from rdflib import URIRef # inline of patchObject to make it async, not just create_task async def asyncPatchObject(graph, context: URIRef, subject, predicate: URIRef, newObject): p = graph.getObjectPatch(context, subject, predicate, newObject) # if not p.isEmpty(): # log.debug("patchObject %r" % p.jsonRepr) await graph.patch(p) async def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float): log.info(f'setFader(fader={fader}, strength={strength:.03f})') valueLit = decimalLiteral(round(strength, 3)) with graph.currentState() as g: fadeSet = g.value(fader, L9['setting']) if fadeSet is None: raise ValueError(f'fader {fader} has no :setting') await asyncPatchObject(graph, ctx, fadeSet, L9['value'], valueLit) def changeGrandMaster(graph: SyncedGraph, newValue: float, ctx: URIRef): graph.patchObject(ctx, L9.grandMaster, L9['value'], decimalLiteral(newValue)) async def onMessage(graph: SyncedGraph, pages: Pages, ctx: URIRef, _lastSet: dict[int, int], m: Event): if m.type == 'active_sensing': return if m.type == 'control_change': if m.dev == 'bcf2000' and m.control == 91: pages.changePage(-1) return if m.dev == 'bcf2000' and m.control == 92: pages.changePage(1) return if m.dev == 'bcf2000' and m.control == 8: changeGrandMaster(graph, clamp(m.value / 127 * 1.5, 0, 1), ctx) return try: fader = pages.lookupFader(m.dev, m.control) except KeyError: log.info(f'unknown control {m}') return try: await writeHwValueToGraph(graph, ctx, fader, m.value / 127) _lastSet[m.control] = m.value except ValueError as e: log.warning(f'{e!r} - ignoring') else: log.info(f'unhandled message {m}') async def main(): logging.getLogger('autodepgraphapi').setLevel(logging.INFO) logging.getLogger('syncedgraph').setLevel(logging.INFO) logging.getLogger('graphedit').setLevel(logging.INFO) mido.set_backend('alsa_midi.mido_backend') MAX_SEND_RATE = 30 _lastSet = {} #midictlchannel:value7bit ctx = URIRef(showUri() + '/fade') graph = SyncedGraph(networking.rdfdb.url, "midifade") pages = Pages(graph, ctx) queue = EventQueue(MAX_SEND_RATE, partial(onMessage, graph, pages, ctx, _lastSet)) listenToMidiInputs(queue) connectToMidiOutput(graph, pages, _lastSet) graph.addHandler(pages.compileCurrents) await queue.run() if __name__ == '__main__': asyncio.run(main())