Changeset - 1bb30b0eaa38
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 20 months ago 2023-06-03 22:46:04
drewp@bigasterisk.com
reformat
1 file changed with 9 insertions and 5 deletions:
0 comments (0 inline, 0 general)
light9/midifade/midifade.py
Show inline comments
 
@@ -17,51 +17,54 @@ from light9.namespaces import L9
 
from light9.newtypes import decimalLiteral
 
from light9.run_local import log
 
from light9.showconfig import showUri
 

	
 
mido.set_backend('alsa_midi.mido_backend')
 
MAX_SEND_RATE = 30
 

	
 
_lastSet = {}  #midictlchannel:value7bit
 

	
 
currentFaders = {}  # midi control channel num : FaderUri
 
ctx = URIRef(showUri() + '/fade')
 

	
 

	
 
def compileCurrents(graph):
 
    currentFaders.clear()
 
    try:
 
        new = getChansToFaders(graph)
 
    except ValueError:
 
        return  # e.g. empty-graph startup
 
    currentFaders.update(new)
 

	
 

	
 
def getGraphMappingNode(g: ReadOnlyConjunctiveGraph|SyncedGraph) -> URIRef:
 
def getGraphMappingNode(g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef:
 
    mapping = g.value(L9['midiControl'], L9['map'])
 
    if mapping is None:
 
        raise ValueError('no :midiControl :map ?mapping')
 
    midiDev = g.value(mapping, L9['midiDev'])
 
    ourDev = 'bcf2000'
 
    if midiDev != Literal(ourDev):
 
        raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
 
    return mapping
 

	
 

	
 
def getCurMappedPage(g: SyncedGraph):
 
    mapping = getGraphMappingNode(g)
 
    return g.value(mapping, L9['outputs'])
 

	
 
def setCurMappedPage(g: SyncedGraph, mapping: URIRef, newPage:URIRef):
 

	
 
def setCurMappedPage(g: SyncedGraph, mapping: URIRef, newPage: URIRef):
 
    g.patchObject(ctx, mapping, L9.outputs, newPage)
 

	
 

	
 
def getChansToFaders(g: SyncedGraph) -> Dict[int, URIRef]:
 
    fadePage = getCurMappedPage(g)
 
    ret = []
 
    for f in g.objects(fadePage, L9.fader):
 
        columnLit = cast(Literal, g.value(f, L9['column']))
 
        col = int(columnLit.toPython())
 
        ret.append((col, f))
 

	
 
    ret.sort()
 
    ctl_channels = list(range(81, 88 + 1))
 
    out = {}
 
    for chan, (col, f) in zip(ctl_channels, ret):
 
@@ -70,29 +73,30 @@ def getChansToFaders(g: SyncedGraph) -> 
 

	
 

	
 
def changePage(g: SyncedGraph, dp: int):
 
    """dp==-1, make the previous page active, etc. Writes to graph"""
 

	
 
    with g.currentState() as current:
 
        allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp))
 
        mapping = getGraphMappingNode(current)
 
        curPage = current.value(mapping, L9.outputs)
 
    if curPage is None:
 
        curPage = allPages[0]
 
    idx = allPages.index(curPage)
 
    newIdx = clamp(idx + dp, 0, len(allPages)-1)
 
    newIdx = clamp(idx + dp, 0, len(allPages) - 1)
 
    print('change from ', idx, newIdx)
 
    newPage =allPages[newIdx]
 
    newPage = allPages[newIdx]
 
    setCurMappedPage(g, mapping, newPage)
 

	
 

	
 
def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
 
    log.info(f'setFader(fader={fader}, strength={strength:.03f}')
 
    valueLit = decimalLiteral(round(strength, 3))
 
    with graph.currentState() as g:
 
        fadeSet = g.value(fader, L9['setting'])
 
    if fadeSet is None:
 
        raise ValueError(f'fader {fader} has no :setting')
 
    graph.patchObject(ctx, fadeSet, L9['value'], valueLit)
 

	
 

	
 
def onMessage(graph: SyncedGraph, ctx: URIRef, m: Dict):
 
    if m['type'] == 'active_sensing':
 
@@ -146,25 +150,25 @@ class WriteBackFaders:
 
            self._update()
 
        except ValueError as e:
 
            log.warning(repr(e))
 

	
 
    def _update(self):
 
        g = self.graph
 
        nupdated = 0
 
        m = getChansToFaders(g)
 
        for midi_ctl_addr, f in m.items():
 
            fset = g.value(f, L9.setting)
 
            # could split this to a separate handler per fader
 
            value = g.value(fset, L9.value).toPython()
 
            hwcurrent = self.getCurrentValue(midi_ctl_addr) 
 
            hwcurrent = self.getCurrentValue(midi_ctl_addr)
 
            hwgoal = int(value * 127)
 
            print(f'{f} {hwcurrent=} {hwgoal=}')
 
            if abs(hwcurrent - hwgoal) > 2:
 
                self.sendToBcf(midi_ctl_addr, hwgoal)
 
                nupdated += 1
 
        log.info(f'wrote to {nupdated} of {len(m)} mapped faders')
 

	
 
    def sendToBcf(self, control, value):
 
        _lastSet[control] = value
 
        msg = mido.Message('control_change', control=control, value=value)
 
        self.bcf_out.send(msg)
 

	
0 comments (0 inline, 0 general)