Changeset - f001d689b3e2
[Not reviewed]
default
0 18 0
Drew Perttula - 6 years ago 2019-05-27 06:20:38
drewp@bigasterisk.com
more py3 and typing fixes
Ignore-this: 3180bd966cac69de56b86ef6a308cad4
18 files changed with 95 insertions and 79 deletions:
0 comments (0 inline, 0 general)
bcf2000.py
Show inline comments
 
#!/usr/bin/python
 
from __future__ import division
 

	
 
import math
 
import twisted.internet.fdesc
 
from twisted.internet import reactor
 
from twisted.internet.task import LoopingCall
 
from typing import Dict
 

	
 
class BCF2000(object):
 

	
 
    control = {81 : "slider1", 82 : "slider2", 83 : "slider3", 84 : "slider4",
 
               85 : "slider5", 86 : "slider6", 87 : "slider7", 88 : "slider8",
 

	
 
                1 : "knob1",  2 : "knob2",  3 : "knob3",  4 : "knob4",
 
                5 : "knob5",  6 : "knob6",  7 : "knob7",  8 : "knob8",
 

	
 
               33 : "button-knob1", 34 : "button-knob2",
 
               35 : "button-knob3", 36 : "button-knob4",
 
               37 : "button-knob5", 38 : "button-knob6",
 
@@ -26,91 +27,91 @@ class BCF2000(object):
 
               75 : "button-lower3",  76 : "button-lower4",
 
               77 : "button-lower5",  78 : "button-lower6",
 
               79 : "button-lower7",  80 : "button-lower8",
 
               89 : "button-corner1", 90 : "button-corner2",
 
               91 : "button-corner3", 92 : "button-corner4",
 
               }
 

	
 
    def __init__(self, dev="/dev/snd/midiC2D0"):
 
        """device was usually /dev/snd/midiC1D0 but then it showed up
 
        once as C0D0. It should be autodetected"""
 
        self.devPath = dev
 
        self.dev = None
 
        self.lastValue = {} # control name : value
 
        self.lastValue: Dict[str, int] = {} # control name : value
 
        self.reopen()
 
        self.packet = ""
 
        loop = LoopingCall(self.poll)
 
        loop.start(.01)
 

	
 
    def poll(self):
 
        try:
 
            bytes = self.dev.read(3)
 
        except (IOError, AttributeError):
 
            return
 
        if len(bytes) == 0:
 
            print "midi stall, reopen slider device"
 
            print("midi stall, reopen slider device")
 
            self.reopen()
 
            return
 
        self.packet += bytes
 
        if len(self.packet) == 3:
 
            p = self.packet
 
            self.packet = ""
 
            self.packetReceived(p)
 

	
 
    def packetReceived(self, packet):
 
        b0, which, value = [ord(b) for b in packet]
 
        if b0 != 0xb0:
 
            return
 
        if which in self.control:
 
            name = self.control[which]
 
            if name.startswith("button-"):
 
                value = value > 0
 
            self.lastValue[name] = value
 
            self.valueIn(name, value)
 
        else:
 
            print "unknown control %s to %s" % (which, value)
 
            print("unknown control %s to %s" % (which, value))
 

	
 
    def reopen(self):
 
        if self.dev is not None:
 
            try:
 
                self.dev.close()
 
            except IOError:
 
                pass
 

	
 
        self.lastValue.clear()
 
        self.dev = open(self.devPath, "r+")
 
        twisted.internet.fdesc.setNonBlocking(self.dev)
 
                    
 
    def valueIn(self, name, value):
 
        """override this with your handler for when events come in
 
        from the hardware"""
 
        print "slider %s to %s" % (name, value)
 
        print("slider %s to %s" % (name, value))
 
        if name == 'slider1':
 
            for x in range(2,8+1):
 
                v2 = int(64 + 64 * math.sin(x / 3 + value / 10))
 
                self.valueOut('slider%d' % x, v2)
 
            for x in range(1,8+1):
 
                self.valueOut('button-upper%s' % x, value > x*15)
 
                self.valueOut('button-lower%s' % x, value > (x*15+7))
 

	
 
    def valueOut(self, name, value):
 
        """call this to send an event to the hardware"""
 
        if self.dev is None:
 
            return
 

	
 
        value = int(value)
 
        if self.lastValue.get(name) == value:
 
            return
 
        self.lastValue[name] = value
 
        which = [k for k,v in self.control.items() if v == name]
 
        which = [k for k,v in list(self.control.items()) if v == name]
 
        assert len(which) == 1, "unknown control name %r" % name
 
        if name.startswith('button-'):
 
            value = value * 127
 
        #print "bcf: write %s %s" % (name, value)
 
        self.dev.write(chr(0xb0) + chr(which[0]) + chr(int(value)))
 

	
 
    def close(self):
 
        self.dev.close()
 
        self.dev = None
 

	
 
if __name__ == '__main__':
 
    b = BCF2000()
bin/keyboardcomposer
Show inline comments
 
#!bin/python
 

	
 
from run_local import log
 
import cgi, time, logging
 
from optparse import OptionParser
 
import webcolors, colorsys
 
from louie import dispatcher
 
from twisted.internet import reactor, tksupport
 
from twisted.web import resource
 
from rdflib import URIRef, Literal
 
import tkinter.tix as tk
 
from typing import Dict, Tuple, List
 

	
 
from light9.Fadable import Fadable
 
from light9.subclient import SubClient
 
from light9 import showconfig, networking, prof
 
from light9.uihelpers import toplevelat
 
from light9.namespaces import L9, RDF, RDFS
 
from light9.tkdnd import initTkdnd, dragSourceRegister, dropTargetRegister
 
from light9 import clientsession
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9.effect.sequencer import CodeWatcher
 
import light9.effect.effecteval
 
from light9.effect.settings import DeviceSettings
 
@@ -75,25 +76,25 @@ class SubScale(tk.Scale, Fadable):
 

	
 
class SubmasterBox(tk.Frame):
 
    """
 
    this object owns the level of the submaster (the rdf graph is the
 
    real authority)
 
    """
 

	
 
    def __init__(self, master, graph, sub, session, col, row):
 
        self.graph = graph
 
        self.sub = sub
 
        self.session = session
 
        self.col, self.row = col, row
 
        bg = self.graph.value(sub, L9.color, default='#000000')
 
        bg = self.graph.value(sub, L9['color'], default='#000000')
 
        rgb = webcolors.hex_to_rgb(bg)
 
        hsv = colorsys.rgb_to_hsv(*[x / 255 for x in rgb])
 
        darkBg = webcolors.rgb_to_hex(
 
            tuple([
 
                int(x * 255) for x in colorsys.hsv_to_rgb(hsv[0], hsv[1], .2)
 
            ]))
 
        tk.Frame.__init__(self, master, bd=1, relief='raised', bg=bg)
 
        self.name = self.graph.label(sub)
 
        self.slider_var = tk.DoubleVar()
 
        self.pauseTrace = False
 
        self.scale = SubScale(self, variable=self.slider_var, width=20)
 

	
 
@@ -177,27 +178,27 @@ class SubmasterBox(tk.Frame):
 
        self.namelabel.config(
 
            text=self.graph.label(self.sub) or shortUri(self.sub))
 

	
 

	
 
class KeyboardComposer(tk.Frame, SubClient):
 

	
 
    def __init__(self, root, graph, session, hw_sliders=True):
 
        tk.Frame.__init__(self, root, bg='black')
 
        SubClient.__init__(self)
 
        self.graph = graph
 
        self.session = session
 

	
 
        self.subbox = {}  # sub uri : SubmasterBox
 
        self.slider_table = {}  # coords : SubmasterBox
 
        self.rows = []  # this holds Tk Frames for each row
 
        self.subbox: Dict[URIRef, SubmasterBox] = {}  # sub uri : SubmasterBox
 
        self.slider_table: Dict[Tuple[int, int], SubmasterBox] = {}  # coords : SubmasterBox
 
        self.rows: List[tk.Frame] = []  # this holds Tk Frames for each row
 

	
 
        self.current_row = 0  # should come from session graph
 

	
 
        self.use_hw_sliders = hw_sliders
 
        self.connect_to_hw(hw_sliders)
 

	
 
        self.make_key_hints()
 
        self.make_buttons()
 

	
 
        self.graph.addHandler(self.redraw_sliders)
 

	
 
        self.codeWatcher = CodeWatcher(
 
@@ -261,25 +262,25 @@ class KeyboardComposer(tk.Frame, SubClie
 
        col = 0
 
        last_group = None
 

	
 
        withgroups = []
 
        for effect in self.graph.subjects(RDF.type, L9['Effect']):
 
            withgroups.append((self.graph.value(effect, L9['group']),
 
                               self.graph.value(effect, L9['order']),
 
                               self.graph.label(effect), effect))
 
        withgroups.sort()
 

	
 
        log.info("withgroups %s", withgroups)
 

	
 
        self.effectEval = {}
 
        self.effectEval: Dict[URIRef, light9.effect.effecteval.EffectEval] = {}
 
        imp.reload(light9.effect.effecteval)
 
        simpleOutputs = SimpleOutputs(self.graph)
 
        for group, order, sortLabel, effect in withgroups:
 
            if col == 0 or group != last_group:
 
                row = self.make_row(group)
 
                rowcount += 1
 
                col = 0
 

	
 
            subbox = SubmasterBox(row, self.graph, effect, self.session, col,
 
                                  rowcount)
 
            subbox.place(relx=col / 8, rely=0, relwidth=1 / 8, relheight=1)
 
            self.subbox[effect] = self.slider_table[(rowcount, col)] = subbox
bin/rdfdb
Show inline comments
 
#!bin/python
 
import run_local  # noqa
 
import os
 
from rdflib import URIRef
 
from light9 import networking, showconfig
 
import rdfdb.service
 

	
 
rdfdb.service.main(
 
    dirUriMap={
 
        os.environ['LIGHT9_SHOW'].rstrip('/') + '/': showconfig.showUri() + '/'
 
        os.environ['LIGHT9_SHOW'].encode('ascii').rstrip(b'/') + b'/':
 
        URIRef(showconfig.showUri() + '/')
 
    },
 
    prefixes={
 
        'show': showconfig.showUri() + '/',
 
        '': 'http://light9.bigasterisk.com/',
 
        'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
 
        'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
 
        'xsd': 'http://www.w3.org/2001/XMLSchema#',
 
        'effect': 'http://light9.bigasterisk.com/effect/',
 
        'dev': 'http://light9.bigasterisk.com/device/',
 
        'show': URIRef(showconfig.showUri() + '/'),
 
        '': URIRef('http://light9.bigasterisk.com/'),
 
        'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'),
 
        'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'),
 
        'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'),
 
        'effect': URIRef('http://light9.bigasterisk.com/effect/'),
 
        'dev': URIRef('http://light9.bigasterisk.com/device/'),
 
    },
 
    port=networking.rdfdb.port,
 
)
light9/collector/collector_client.py
Show inline comments
 
@@ -2,25 +2,24 @@ from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings):
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
light9/effect/effecteval.py
Show inline comments
 
from rdflib import Literal
 
from rdflib import Literal, URIRef
 
from light9.namespaces import L9, DEV
 
from webcolors import rgb_to_hex, hex_to_rgb
 
from colorsys import hsv_to_rgb
 
import math
 
from noise import pnoise1
 
import logging
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.scale import scale
 
from typing import Dict, Tuple, Any
 
import random
 
random.seed(0)
 
print("reload effecteval")
 

	
 
log = logging.getLogger('effecteval')
 

	
 

	
 
def literalColor(rnorm, gnorm, bnorm):
 
    return Literal(
 
        rgb_to_hex([int(rnorm * 255),
 
                    int(gnorm * 255),
 
                    int(bnorm * 255)]))
 
@@ -67,25 +68,25 @@ class EffectEval(object):
 
        the effect code.
 
        """
 
        # both callers need to apply note overrides
 
        effectSettings = dict(
 
            effectSettings
 
        )  # we should make everything into nice float and Color objects too
 

	
 
        strength = float(effectSettings[L9['strength']])
 
        if strength <= 0:
 
            return DeviceSettings(self.graph, []), {'zero': True}
 

	
 
        report = {}
 
        out = {}  # (dev, attr): value
 
        out: Dict[Tuple[URIRef, URIRef], Any] = {}  # (dev, attr): value
 

	
 
        out.update(
 
            self.simpleOutputs.values(
 
                self.effect, strength,
 
                effectSettings.get(L9['colorScale'], None)))
 

	
 
        if self.effect.startswith(L9['effect/']):
 
            tail = 'effect_' + self.effect[len(L9['effect/']):]
 
            try:
 
                func = globals()[tail]
 
            except KeyError:
 
                report['error'] = 'effect code not found for %s' % self.effect
light9/effect/sequencer.py
Show inline comments
 
@@ -2,68 +2,70 @@
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from louie import dispatcher
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from typing import Any, Callable, Dict, List, Tuple
 

	
 
from light9.namespaces import L9, RDF
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection(
 
    '/sequencer/',
 
    scales.PmfStat('update'),
 
    scales.PmfStat('compileGraph'),
 
    scales.PmfStat('compileSong'),
 
    scales.DoubleStat('recentFps'),
 
)
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph, uri, effectevalModule, simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings = {}  # {effectAttr: value}
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
            settingValues = dict(g.predicate_objects(s))
 
            ea = settingValues[L9['effectAttr']]
 
            self.baseEffectSettings[ea] = settingValues[L9['value']]
 

	
 
        def floatVal(s, p):
 
            return float(g.value(s, p).toPython())
 

	
 
        originTime = floatVal(uri, L9['originTime'])
 
        self.points = []
 
        self.points: List[Tuple[float, float]] = []
 
        for curve in g.objects(uri, L9['curve']):
 
            self.points.extend(
 
                self.getCurvePoints(curve, L9['strength'], originTime))
 
        self.points.sort()
 

	
 
    def getCurvePoints(self, curve, attr, originTime):
 
    def getCurvePoints(self, curve, attr, originTime) -> List[Tuple[float, float]]:
 
        points = []
 
        po = list(self.graph.predicate_objects(curve))
 
        if dict(po).get(L9['attr'], None) != attr:
 
            return []
 
        for point in [row[1] for row in po if row[0] == L9['point']]:
 
            po2 = dict(self.graph.predicate_objects(point))
 
            points.append(
 
                (originTime + float(po2[L9['time']]), float(po2[L9['value']])))
 
        return points
 

	
 
    def activeAt(self, t):
 
        return self.points[0][0] <= t <= self.points[-1][0]
 
@@ -120,34 +122,35 @@ class CodeWatcher(object):
 

	
 
        def go():
 
            log.info("reload effecteval")
 
            imp.reload(effecteval)
 
            self.onChange()
 

	
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self, graph, sendToCollector, fps=40):
 
    def __init__(self, graph: SyncedGraph, sendToCollector: Callable[[DeviceSettings], None],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes = []
 
        self.lastStatLog = 0
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes = {}  # song: [notes]
 
        self.notes: Dict[URIRef, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 

	
 
    @stats.compileGraph.time()
 
    def compileGraph(self):
 
        """rebuild our data from the graph"""
 
        t1 = time.time()
 
        g = self.graph
 
@@ -157,25 +160,25 @@ class Sequencer(object):
 
        log.info('compileGraph took %.2f ms', 1000 * (time.time() - t1))
 

	
 
    @stats.compileSong.time()
 
    def compileSong(self, song):
 
        t1 = time.time()
 

	
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            self.notes[song].append(
 
                Note(self.graph, note, effecteval, self.simpleOutputs))
 
        log.info('  compile %s took %.2f ms', song, 1000 * (time.time() - t1))
 

	
 
    def updateLoop(self):
 
    def updateLoop(self) -> None:
 
        # print "updateLoop"
 
        now = time.time()
 
        self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now]
 
        stats.recentFps = len(self.recentUpdateTimes) / (
 
            self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
 
        if now > self.lastStatLog + .2:
 
            dispatcher.send(
 
                'state',
 
                update={
 
                    'recentDeltas':
 
                    sorted([
 
                        round(t1 - t0, 4)
 
@@ -223,25 +226,25 @@ class Sequencer(object):
 
            dispatcher.send('state', update={'songNotes': noteReports})
 
            return self.sendToCollector(
 
                DeviceSettings.fromList(self.graph, settings))
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs):
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state = {}
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update):
 
        self.state.update(update)
 

	
 
    def bind(self):
 
        self.numConnected += 1
 

	
 
        if self.numConnected == 1:
 
            self.loop()
 

	
light9/effect/settings.py
Show inline comments
 
"""
 
Data structure and convertors for a table of (device,attr,value)
 
rows. These might be effect attrs ('strength'), device attrs ('rx'),
 
or output attrs (dmx channel).
 
"""
 
import decimal
 
import numpy
 
from rdflib import URIRef, Literal
 
from light9.namespaces import RDF, L9
 
import logging
 
log = logging.getLogger('settings')
 
from light9.collector.device import resolve
 

	
 
from typing import Sequence, Dict, Union, List
 

	
 
def parseHex(h):
 
    if h[0] != '#': raise ValueError(h)
 
    return [int(h[i:i + 2], 16) for i in (1, 3, 5)]
 

	
 

	
 
def parseHexNorm(h):
 
    return [x / 255 for x in parseHex(h)]
 

	
 

	
 
def toHex(rgbFloat):
 
    return '#%02x%02x%02x' % tuple(
 
        max(0, min(255, int(v * 255))) for v in rgbFloat)
 
def toHex(rgbFloat: Sequence[float]) -> str:
 
    assert len(rgbFloat) == 3
 
    scaled = (max(0, min(255, int(v * 255))) for v in rgbFloat)
 
    return '#%02x%02x%02x' % tuple(scaled) # type: ignore
 

	
 

	
 
def getVal(graph, subj):
 
    lit = graph.value(subj, L9['value']) or graph.value(subj, L9['scaledValue'])
 
    ret = lit.toPython()
 
    if isinstance(ret, decimal.Decimal):
 
        ret = float(ret)
 
    return ret
 

	
 

	
 
class _Settings(object):
 
    """
 
    default values are 0 or '#000000'. Internal rep must not store zeros or some
 
    comparisons will break.
 
    """
 

	
 
    def __init__(self, graph, settingsList):
 
        self.graph = graph  # for looking up all possible attrs
 
        self._compiled = {}  # dev: { attr: val }; val is number or colorhex
 
        self._compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}  # dev: { attr: val }; val is number or colorhex
 
        for row in settingsList:
 
            self._compiled.setdefault(row[0], {})[row[1]] = row[2]
 
        # self._compiled may not be final yet- see _fromCompiled
 
        self._delZeros()
 

	
 
    @classmethod
 
    def _fromCompiled(cls, graph, compiled):
 
        obj = cls(graph, [])
 
        obj._compiled = compiled
 
        obj._delZeros()
 
        return obj
 

	
 
@@ -59,25 +60,25 @@ class _Settings(object):
 
    def fromResource(cls, graph, subj):
 
        settingsList = []
 
        with graph.currentState() as g:
 
            for s in g.objects(subj, L9['setting']):
 
                d = g.value(s, L9['device'])
 
                da = g.value(s, L9['deviceAttr'])
 
                v = getVal(g, s)
 
                settingsList.append((d, da, v))
 
        return cls(graph, settingsList)
 

	
 
    @classmethod
 
    def fromVector(cls, graph, vector, deviceAttrFilter=None):
 
        compiled = {}
 
        compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}
 
        i = 0
 
        for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
 
            if a == L9['color']:
 
                v = toHex(vector[i:i + 3])
 
                i += 3
 
            else:
 
                v = vector[i]
 
                i += 1
 
            compiled.setdefault(d, {})[a] = v
 
        return cls._fromCompiled(graph, compiled)
 

	
 
    @classmethod
 
@@ -177,25 +178,25 @@ class _Settings(object):
 
    def asList(self):
 
        """old style list of (dev, attr, val) tuples"""
 
        out = []
 
        for dev, av in self._compiled.items():
 
            for attr, val in av.items():
 
                out.append((dev, attr, val))
 
        return out
 

	
 
    def devices(self):
 
        return list(self._compiled.keys())
 

	
 
    def toVector(self, deviceAttrFilter=None):
 
        out = []
 
        out: List[float] = []
 
        for dev, attr in self._vectorKeys(deviceAttrFilter):
 
            v = self.getValue(dev, attr)
 
            if attr == L9['color']:
 
                out.extend(parseHexNorm(v))
 
            else:
 
                out.append(v)
 
        return out
 

	
 
    def byDevice(self):
 
        for dev, av in self._compiled.items():
 
            yield dev, self.__class__._fromCompiled(self.graph, {dev: av})
 

	
light9/effect/simple_outputs.py
Show inline comments
 
import traceback
 
from light9.namespaces import L9, RDF
 
from light9.effect.scale import scale
 

	
 
from typing import Dict, List, Tuple, Any
 
from rdflib import URIRef
 

	
 
class SimpleOutputs(object):
 

	
 
    def __init__(self, graph):
 
        self.graph = graph
 

	
 
        # effect : [(dev, attr, value, isScaled)]
 
        self.effectOutputs = {}
 
        self.effectOutputs: Dict[URIRef, List[Tuple[URIRef, URIRef, Any, bool]]] = {}
 

	
 
        self.graph.addHandler(self.updateEffectsFromGraph)
 

	
 
    def updateEffectsFromGraph(self):
 
        for effect in self.graph.subjects(RDF.type, L9['Effect']):
 
            settings = []
 
            for setting in self.graph.objects(effect, L9['setting']):
 
                settingValues = dict(self.graph.predicate_objects(setting))
 
                try:
 
                    d = settingValues.get(L9['device'], None)
 
                    a = settingValues.get(L9['deviceAttr'], None)
 
                    v = settingValues.get(L9['value'], None)
light9/namespaces.py
Show inline comments
 
from rdflib import Namespace, RDF, RDFS  # noqa
 
from typing import Dict
 

	
 

	
 
# Namespace was showing up in profiles
 
class FastNs(object):
 

	
 
    def __init__(self, base):
 
        self.ns = Namespace(base)
 
        self.cache = {}
 
        self.cache: Dict[str, Namespace] = {}
 

	
 
    def __getitem__(self, term):
 
        if term not in self.cache:
 
            self.cache[term] = self.ns[term]
 
        return self.cache[term]
 

	
 
    __getattr__ = __getitem__
 

	
 

	
 
L9 = FastNs("http://light9.bigasterisk.com/")
 
MUS = Namespace("http://light9.bigasterisk.com/music/")
 
XSD = Namespace("http://www.w3.org/2001/XMLSchema#")
light9/networking.py
Show inline comments
 
from urllib.parse import urlparse
 
from urllib.parse import splitport
 
from .showconfig import getGraph, showUri
 
from .namespaces import L9
 

	
 

	
 
class ServiceAddress(object):
 

	
 
    def __init__(self, service):
 
        self.service = service
 

	
 
    def _url(self):
 
        graph = getGraph()
 
        net = graph.value(showUri(), L9['networking'])
 
        ret = graph.value(net, self.service)
 
        if ret is None:
 
            raise ValueError("no url for %s -> %s -> %s" %
 
                             (showUri(), L9['networking'], self.service))
 
        return str(ret)
 

	
 
    @property
 
    def port(self):
 
        _, netloc, _, _, _, _ = urlparse(self._url())
 
        host, port = splitport(netloc)
 
        return int(port)
 
        return urlparse(self._url()).port
 

	
 
    @property
 
    def host(self):
 
        _, netloc, _, _, _, _ = urlparse(self._url())
 
        host, port = splitport(netloc)
 
        return host
 
        return urlparse(self._url()).hostname
 

	
 
    @property
 
    def url(self):
 
        return self._url()
 

	
 
    value = url
 

	
 
    def path(self, more):
 
        return self.url + str(more)
 

	
 

	
 
captureDevice = ServiceAddress(L9['captureDevice'])
light9/prof.py
Show inline comments
 
import sys, traceback, time, logging
 
from typing import Any, Dict
 
log = logging.getLogger()
 

	
 

	
 
def run(main, profile=None):
 
    if not profile:
 
        main()
 
        return
 

	
 
    if profile == 'hotshot':
 
        import hotshot, hotshot.stats
 
        p = hotshot.Profile("/tmp/pro")
 
        p.runcall(main)
 
@@ -21,25 +22,25 @@ def run(main, profile=None):
 
        finally:
 
            statprof.stop()
 
            statprof.display()
 

	
 

	
 
def watchPoint(filename, lineno, event="call"):
 
    """whenever we hit this line, print a stack trace. event='call'
 
    for lines that are function definitions, like what a profiler
 
    gives you.
 

	
 
    Switch to 'line' to match lines inside functions. Execution speed
 
    will be much slower."""
 
    seenTraces = {}  # trace contents : count
 
    seenTraces: Dict[Any, int] = {}  # trace contents : count
 

	
 
    def trace(frame, ev, arg):
 
        if ev == event:
 
            if (frame.f_code.co_filename, frame.f_lineno) == (filename, lineno):
 
                stack = ''.join(traceback.format_stack(frame))
 
                if stack not in seenTraces:
 
                    print("watchPoint hit")
 
                    print(stack)
 
                    seenTraces[stack] = 1
 
                else:
 
                    seenTraces[stack] += 1
 

	
light9/showconfig.py
Show inline comments
 
import logging, warnings
 
from twisted.python.filepath import FilePath
 
from os import path, getenv
 
from rdflib import Graph
 
from rdflib import URIRef
 
from rdflib import URIRef, Literal
 
from .namespaces import L9
 
from typing import List, cast
 
log = logging.getLogger('showconfig')
 

	
 
_config = None  # graph
 

	
 

	
 
def getGraph():
 
def getGraph() -> Graph:
 
    warnings.warn(
 
        "code that's using showconfig.getGraph should be "
 
        "converted to use the sync graph",
 
        stacklevel=2)
 
    global _config
 
    if _config is None:
 
        graph = Graph()
 
        # note that logging is probably not configured the first time
 
        # we're in here
 
        warnings.warn("reading n3 files around %r" % root())
 
        for f in FilePath(root()).globChildren("*.n3") + FilePath(
 
                root()).globChildren("build/*.n3"):
 
            graph.parse(location=f.path, format='n3')
 
        _config = graph
 
    return _config
 

	
 

	
 
def root():
 
def root() -> bytes:
 
    r = getenv("LIGHT9_SHOW")
 
    if r is None:
 
        raise OSError(
 
            "LIGHT9_SHOW env variable has not been set to the show root")
 
    return r
 
    return r.encode('ascii')
 

	
 

	
 
_showUri = None
 

	
 

	
 
def showUri():
 
def showUri() -> URIRef:
 
    """Return the show URI associated with $LIGHT9_SHOW."""
 
    global _showUri
 
    if _showUri is None:
 
        _showUri = URIRef(open(path.join(root(), 'URI')).read().strip())
 
        _showUri = URIRef(open(path.join(root(), b'URI')).read().strip())
 
    return _showUri
 

	
 

	
 
def songOnDisk(song):
 
def songOnDisk(song: URIRef) -> bytes:
 
    """given a song URI, where's the on-disk file that mpd would read?"""
 
    graph = getGraph()
 
    root = graph.value(showUri(), L9['musicRoot'])
 
    if not root:
 
        raise ValueError("%s has no :musicRoot" % showUri())
 

	
 
    name = graph.value(song, L9['songFilename'])
 
    if not name:
 
        raise ValueError("Song %r has no :songFilename" % song)
 

	
 
    return path.abspath(path.join(root, name))
 
    return path.abspath(path.join(
 
        cast(Literal, root).toPython(),
 
        cast(Literal, name).toPython()))
 

	
 

	
 
def songFilenameFromURI(uri):
 
def songFilenameFromURI(uri: URIRef) -> bytes:
 
    """
 
    'http://light9.bigasterisk.com/show/dance2007/song8' -> 'song8'
 

	
 
    everything that uses this should be deprecated for real URIs
 
    everywhere"""
 
    assert isinstance(uri, URIRef)
 
    return uri.split('/')[-1]
 
    return str(uri).split('/')[-1].encode('ascii')
 

	
 

	
 
def getSongsFromShow(graph, show):
 
def getSongsFromShow(graph: Graph, show: URIRef) -> List[URIRef]:
 
    playList = graph.value(show, L9['playList'])
 
    if not playList:
 
        raise ValueError("%r has no l9:playList" % show)
 
    # The patch in https://github.com/RDFLib/rdflib/issues/305 fixed a
 
    # serious bug here.
 
    songs = list(graph.items(playList))
 

	
 
    return songs
 

	
 

	
 
def curvesDir():
 
    return path.join(root(), "curves")
 
    return path.join(root(), b"curves")
 

	
 

	
 
def subFile(subname):
 
    return path.join(root(), "subs", subname)
 
    return path.join(root(), b"subs", subname)
 

	
 

	
 
def subsDir():
 
    return path.join(root(), 'subs')
 
    return path.join(root(), b'subs')
light9/tkdnd.py
Show inline comments
 
from glob import glob
 
from os.path import join, basename
 

	
 
from typing import Dict, Any
 

	
 
class TkdndEvent(object):
 
    """
 
    see http://www.ellogon.org/petasis/tcltk-projects/tkdnd/tkdnd-man-page
 
    for details on the fields
 

	
 
    The longer attribute names (action instead of %A) were made up for
 
    this API.
 

	
 
    Not all attributes are visible yet, since I have not thought
 
    through what conversions they should receive and I don't want to
 
    unnecessarily change their types later.
 
@@ -36,25 +36,25 @@ class TkdndEvent(object):
 
        return (ev,)
 

	
 
    tclSubstitutions = ' '.join(sorted(substitutions.keys()))
 

	
 
    def __repr__(self):
 
        return "<TkdndEvent %r>" % self.__dict__
 

	
 

	
 
class Hover(object):
 

	
 
    def __init__(self, widget, style):
 
        self.widget, self.style = widget, style
 
        self.oldStyle = {}
 
        self.oldStyle: Dict[Any, Any] = {}
 

	
 
    def set(self, ev):
 
        for k, v in list(self.style.items()):
 
            self.oldStyle[k] = self.widget.cget(k)
 
        self.widget.configure(**self.style)
 
        return ev.action
 

	
 
    def restore(self, ev):
 
        self.widget.configure(**self.oldStyle)
 

	
 

	
 
def initTkdnd(tk, tkdndBuildDir):
light9/uihelpers.py
Show inline comments
 
"""all the tiny tk helper functions"""
 

	
 
#from Tkinter import Button
 
import logging, time
 
from rdflib import Literal
 
from tkinter.tix import Button, Toplevel, Tk, IntVar, Entry, DoubleVar
 
import tkinter
 
from light9.namespaces import L9
 
from typing import Dict
 

	
 
log = logging.getLogger("toplevel")
 

	
 
windowlocations = {
 
    'sub': '425x738+00+00',
 
    'console': '168x24+848+000',
 
    'leveldisplay': '144x340+870+400',
 
    'cuefader': '314x212+546+741',
 
    'effect': '24x24+0963+338',
 
    'stage': '823x683+37+030',
 
    'scenes': '504x198+462+12',
 
}
 
@@ -35,57 +36,57 @@ def toplevel_savegeometry(tl, name):
 
        # else the window never got mapped
 
    except Exception:
 
        # it's ok if there's no saved geometry
 
        pass
 

	
 

	
 
def toplevelat(name, existingtoplevel=None, graph=None, session=None):
 
    tl = existingtoplevel or Toplevel()
 
    tl.title(name)
 

	
 
    lastSaved = [None]
 
    setOnce = [False]
 
    graphSetTime = [0]
 
    graphSetTime = [0.0]
 

	
 
    def setPosFromGraphOnce():
 
        """
 
        the graph is probably initially empty, but as soon as it gives
 
        us one window position, we stop reading them
 
        """
 
        if setOnce[0]:
 
            return
 
        geo = graph.value(session, L9.windowGeometry)
 
        geo = graph.value(session, L9['windowGeometry'])
 
        log.debug("setPosFromGraphOnce %s", geo)
 

	
 
        setOnce[0] = True
 
        graphSetTime[0] = time.time()
 
        if geo is not None and geo != lastSaved[0]:
 
            tl.geometry(geo)
 
            lastSaved[0] = geo
 

	
 
    def savePos(ev):
 
        geo = tl.geometry()
 
        if not isinstance(ev.widget, (Tk, tkinter.Tk)):
 
            # I think these are due to internal widget size changes,
 
            # not the toplevel changing
 
            return
 
        # this is trying to not save all the startup automatic window
 
        # sizes. I don't have a better plan for this yet.
 
        if graphSetTime[0] == 0 or time.time() < graphSetTime[0] + 3:
 
            return
 
        if not setOnce[0]:
 
            return
 
        lastSaved[0] = geo
 
        log.debug("saving position %s", geo)
 
        graph.patchObject(session, session, L9.windowGeometry, Literal(geo))
 
        graph.patchObject(session, session, L9['windowGeometry'], Literal(geo))
 

	
 
    if graph is not None and session is not None:
 
        graph.addHandler(setPosFromGraphOnce)
 

	
 
    if name in windowlocations:
 
        tl.geometry(positionOnCurrentDesktop(windowlocations[name]))
 

	
 
    if graph is not None:
 
        tl._toplevelat_funcid = tl.bind(
 
            "<Configure>", lambda ev, tl=tl, name=name: savePos(ev))
 

	
 
    return tl
 
@@ -133,33 +134,33 @@ def eventtoparent(ev, sequence):
 
    if par != ".":
 
        ev.widget.nametowidget(par).event_generate(sequence, **evdict)
 
    #else the event made it all the way to the top, unhandled
 

	
 

	
 
def colorlabel(label):
 
    """color a label based on its own text"""
 
    txt = label['text'] or "0"
 
    lev = float(txt) / 100
 
    low = (80, 80, 180)
 
    high = (255, 55, 0o50)
 
    out = [int(l + lev * (h - l)) for h, l in zip(high, low)]
 
    col = "#%02X%02X%02X" % tuple(out)
 
    col = "#%02X%02X%02X" % tuple(out) # type: ignore
 
    label.config(bg=col)
 

	
 

	
 
# TODO: get everyone to use this
 
def colorfade(low, high, percent):
 
    '''not foolproof.  make sure 0 < percent < 1'''
 
    out = [int(l + percent * (h - l)) for h, l in zip(high, low)]
 
    col = "#%02X%02X%02X" % tuple(out)
 
    col = "#%02X%02X%02X" % tuple(out) # type: ignore
 
    return col
 

	
 

	
 
def colortotuple(anytkobj, colorname):
 
    'pass any tk object and a color name, like "yellow"'
 
    rgb = anytkobj.winfo_rgb(colorname)
 
    return [v / 256 for v in rgb]
 

	
 

	
 
class Togglebutton(Button):
 
    """works like a single radiobutton, but it's a button so the
 
    label's on the button face, not to the side. the optional command
 
@@ -207,26 +208,26 @@ class Togglebutton(Button):
 
        self.state = newstate
 
        if newstate:  # set
 
            self.config(bg=self.downcolor, relief='sunken')
 
        else:  # unset
 
            self.config(bg=self._origbkg, relief='raised')
 
        return "break"
 

	
 

	
 
class FancyDoubleVar(DoubleVar):
 

	
 
    def __init__(self, master=None):
 
        DoubleVar.__init__(self, master)
 
        self.callbacklist = {}  # cbname : mode
 
        self.namedtraces = {}  # name : cbname
 
        self.callbacklist: Dict[str, str] = {}  # cbname : mode
 
        self.namedtraces: Dict[str, str] = {}  # name : cbname
 

	
 
    def trace_variable(self, mode, callback):
 
        """Define a trace callback for the variable.
 

	
 
        MODE is one of "r", "w", "u" for read, write, undefine.
 
        CALLBACK must be a function which is called when
 
        the variable is read, written or undefined.
 

	
 
        Return the name of the callback.
 
        """
 
        cbname = self._master._register(callback)
 
        self._tk.call("trace", "variable", self._name, mode, cbname)
light9/vidref/musictime.py
Show inline comments
 
import time, json, logging
 
from light9 import networking
 
from twisted.internet import reactor
 
from cyclone.httpclient import fetch
 
from typing import Dict
 
log = logging.getLogger()
 

	
 

	
 
class MusicTime(object):
 
    """
 
    fetch times from ascoltami in a background thread; return times
 
    upon request, adjusted to be more precise with the system clock
 
    """
 

	
 
    def __init__(self,
 
                 period=.2,
 
                 onChange=lambda position: None,
 
@@ -19,25 +20,25 @@ class MusicTime(object):
 

	
 
        We call onChange with the time in seconds and the total time
 

	
 
        The choice of period doesn't need to be tied to framerate,
 
        it's more the size of the error you can tolerate (since we
 
        make up times between the samples, and we'll just run off the
 
        end of a song)
 
        """
 
        self.period = period
 
        self.hoverPeriod = .05
 
        self.onChange = onChange
 

	
 
        self.position = {}
 
        self.position: Dict[str, float] = {}
 
        # driven by our pollCurvecalcTime and also by Gui.incomingTime
 
        self.lastHoverTime = None  # None means "no recent value"
 
        self.pollMusicTime()
 
        if pollCurvecalc:
 
            self.pollCurvecalcTime()
 

	
 
    def getLatest(self, frameTime=None):
 
        """
 
        dict with 't' and 'song', etc.
 

	
 
        frameTime is the timestamp from the camera, which will be used
 
        instead of now.
 
@@ -116,15 +117,15 @@ class MusicTime(object):
 
            self.lastHoverTime = None
 
            reactor.callLater(2, self.pollCurvecalcTime)
 

	
 
        d = fetch(networking.curveCalc.path("hoverTime"))
 
        d.addCallback(cb)
 
        d.addErrback(eb)  # note this includes errors in cb()
 

	
 
    def sendTime(self, t):
 
        """request that the player go to this time"""
 
        fetch(
 
            method=b'POST',
 
            url=networking.musicPlayer.path('time'),
 
            body=json.dumps({"t": t}),
 
            postdata=json.dumps({"t": t}).encode('utf8'),
 
            headers={b"content-type": [b"application/json"]},
 
        )
light9/web/websocket.js
Show inline comments
 
/*
 
  url is now relative to the window location
 
  url is now relative to the window location. Note that nginx may drop
 
  the connection after 60sec of inactivity.
 
*/
 
function reconnectingWebSocket(url, onMessage) {
 
    var pong = 0;
 
    
 
    var fullUrl = (
 
        "ws://"
 
            + window.location.host
 
            + window.location.pathname
 
            + (window.location.pathname.match(/\/$/) ? "" : "/")
 
            + url);
 
    function connect() {
 
        var ws = new WebSocket(fullUrl);
requirements.txt
Show inline comments
 
@@ -22,20 +22,21 @@ toposort==1.5
 
treq==18.6.0
 
txzmq==0.8.0
 
typing==3.6.1
 
watchdog==0.8.3
 
webcolors==1.7
 
yapf==0.27.0
 

	
 
coverage==4.3.4
 
ipdb==0.10.2
 
ipython==5.3.0
 
mypy==0.701
 
flake8
 
typing_extensions
 

	
 
git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
 
git+http://github.com/11craft/louie.git@f18bb71010c114eca9c6b88c96453340e3b39454#egg=louie
 
git+http://github.com/webpy/webpy@ace0f8954c28311004b33c7a513c6e40a3c02470#egg=web
 
https://github.com/drewp/cyclone/archive/python3.zip#egg=cyclone
 

	
 
cycloneerr==0.3.0
 
rdfdb==0.15.0
tasks.py
Show inline comments
 
@@ -21,34 +21,37 @@ bin_sources = [
 
        'bin/run_local.py',
 
        'bin/subcomposer',
 
        'bin/subserver',
 
        'bin/vidref',
 
        'bin/vidrefsetup',
 
        'bin/wavecurve',
 
    ]
 
def pkg_sources():
 
    return glob.glob('light9/**/*.py', recursive=True)
 

	
 
@task
 
def mypy(ctx):
 
    print('\n\n')
 
    def run(sources):
 
        ss = ' '.join(sources)
 
        ctx.run(f'MYPYPATH=stubs env/bin/mypy --check-untyped-defs {ss}',
 
        ctx.run(f'MYPYPATH=stubs:/my/proj/rdfdb env/bin/mypy --check-untyped-defs {ss}',
 
                pty=True, warn=True)
 

	
 
    sources = ' '.join(bin_sources + pkg_sources())
 
    ctx.run(f'env/bin/flake8 --ignore=E115,E123,E124,E126,E225,E231,E261,E262,E265,E301,E302,E303,E305,E306,E401,E402,E501,E701,E731,W291,W293,W391,W504 {sources}', warn=True)
 

	
 
    sources = ' '.join(pkg_sources())
 
    for src in bin_sources:
 
        print(f"mypy {src}")
 
        run([src])# + pkg_sources())
 
    run(['bin/rdfdb'])
 
    run(['bin/keyboardcomposer'])
 
    #for src in bin_sources:
 
    #    print(f"mypy {src}")
 
    #    run([src])# + pkg_sources())
 
@task
 
def reformat(ctx):
 
    ctx.run("env/bin/yapf --verbose --parallel --in-place --style google light9/**/*.py `file --no-pad  bin/* | grep 'Python script' | perl -lpe 's/:.*//'`")
 
    
 
@task
 
def test(ctx):
 
    ctx.run('docker build -f Dockerfile.build -t light9_build:latest .')
 
    ctx.run('docker run --rm -it -v `pwd`:/opt light9_build:latest'
 
            ' nose2 -v light9.currentstategraphapi_test light9.graphfile_test',
 
            pty=True)
0 comments (0 inline, 0 general)