view bin/inputquneo @ 1870:51a9b23db357

rm stubs/rdfdb; point mypy to the real code instead Ignore-this: cd59283a21e2e593682e105d6507a462
author Drew Perttula <drewp@bigasterisk.com>
date Mon, 27 May 2019 00:41:38 +0000
parents 3c523c71da29
children ccdfdc8183ad
line wrap: on
line source

#!bin/python
"""
read Quneo midi events, write to curvecalc and maybe to effects
"""

from run_local import log
import logging, urllib.request, urllib.parse, urllib.error
import cyclone.web, cyclone.httpclient
from rdflib import URIRef
from twisted.internet import reactor, task
from light9.curvecalc.client import sendLiveInputPoint
from light9.namespaces import L9, RDF
from rdfdb.syncedgraph import SyncedGraph
from light9 import networking

import sys
sys.path.append('/usr/lib/python2.7/dist-packages')  # For pygame
import pygame.midi

curves = {
    23: URIRef('http://light9.bigasterisk.com/show/dance2014/song1/curve/c-2'),
    24: URIRef('http://light9.bigasterisk.com/show/dance2014/song1/curve/c-3'),
    25: URIRef('http://light9.bigasterisk.com/show/dance2014/song1/curve/c-4'),
    6: URIRef('http://light9.bigasterisk.com/show/dance2014/song1/curve/c-5'),
    18: URIRef('http://light9.bigasterisk.com/show/dance2014/song1/curve/c-6'),
}


class WatchMidi(object):

    def __init__(self, graph):
        self.graph = graph
        pygame.midi.init()

        dev = self.findQuneo()
        self.inp = pygame.midi.Input(dev)
        task.LoopingCall(self.step).start(.05)

        self.noteIsOn = {}

        self.effectMap = {}  # note: effect class uri
        self.graph.addHandler(self.setupNotes)

    def setupNotes(self):
        for e in self.graph.subjects(RDF.type, L9['EffectClass']):
            qn = self.graph.value(e, L9['quneoNote'])
            if qn:
                self.effectMap[int(qn)] = e
        log.info("setup with %s effects", len(self.effectMap))

    def findQuneo(self):
        for dev in range(pygame.midi.get_count()):
            interf, name, isInput, isOutput, opened = pygame.midi.get_device_info(
                dev)
            if 'QUNEO' in name and isInput:
                return dev
        raise ValueError("didn't find quneo input device")

    def step(self):
        if not self.inp.poll():
            return
        NOTEON, NOTEOFF = 144, 128
        for ev in self.inp.read(999):
            (status, d1, d2, _), _ = ev
            if status in [NOTEON, NOTEOFF]:
                print(status, d1, d2)

            if status == NOTEON:
                if not self.noteIsOn.get(d1):
                    self.noteIsOn[d1] = True
                    try:
                        e = self.effectMap[d1]
                        cyclone.httpclient.fetch(
                            url=networking.effectEval.path('songEffects'),
                            method='POST',
                            headers={
                                'Content-Type':
                                ['application/x-www-form-urlencoded']
                            },
                            postdata=urllib.parse.urlencode([('drop', e)]),
                        )
                    except KeyError:
                        pass

            if status == NOTEOFF:
                self.noteIsOn[d1] = False

            if 0:
                # curve editing mode, not done yet
                for group in [(23, 24, 25), (6, 18)]:
                    if d1 in group:
                        if not self.noteIsOn.get(group):
                            print("start zero")

                            for d in group:
                                sendLiveInputPoint(curves[d], 0)
                            self.noteIsOn[group] = True
                        else:  # miss first update
                            sendLiveInputPoint(curves[d1], d2 / 127)

                    if status == 128:  #noteoff
                        for d in group:
                            sendLiveInputPoint(curves[d], 0)
                        self.noteIsOn[group] = False


def main():
    log.setLevel(logging.DEBUG)
    graph = SyncedGraph(networking.rdfdb.url, "inputQuneo")
    wm = WatchMidi(graph)
    reactor.run()
    del wm


main()