Changeset - f001d689b3e2
[Not reviewed]
default
0 18 0
Drew Perttula - 6 years ago 2019-05-27 06:20:38
drewp@bigasterisk.com
more py3 and typing fixes
Ignore-this: 3180bd966cac69de56b86ef6a308cad4
18 files changed with 95 insertions and 79 deletions:
0 comments (0 inline, 0 general)
bcf2000.py
Show inline comments
 
#!/usr/bin/python
 
from __future__ import division
 

	
 
import math
 
import twisted.internet.fdesc
 
from twisted.internet import reactor
 
from twisted.internet.task import LoopingCall
 
from typing import Dict
 

	
 
class BCF2000(object):
 

	
 
    control = {81 : "slider1", 82 : "slider2", 83 : "slider3", 84 : "slider4",
 
               85 : "slider5", 86 : "slider6", 87 : "slider7", 88 : "slider8",
 

	
 
@@ -32,25 +33,25 @@ class BCF2000(object):
 

	
 
    def __init__(self, dev="/dev/snd/midiC2D0"):
 
        """device was usually /dev/snd/midiC1D0 but then it showed up
 
        once as C0D0. It should be autodetected"""
 
        self.devPath = dev
 
        self.dev = None
 
        self.lastValue = {} # control name : value
 
        self.lastValue: Dict[str, int] = {} # control name : value
 
        self.reopen()
 
        self.packet = ""
 
        loop = LoopingCall(self.poll)
 
        loop.start(.01)
 

	
 
    def poll(self):
 
        try:
 
            bytes = self.dev.read(3)
 
        except (IOError, AttributeError):
 
            return
 
        if len(bytes) == 0:
 
            print "midi stall, reopen slider device"
 
            print("midi stall, reopen slider device")
 
            self.reopen()
 
            return
 
        self.packet += bytes
 
        if len(self.packet) == 3:
 
            p = self.packet
 
            self.packet = ""
 
@@ -64,13 +65,13 @@ class BCF2000(object):
 
            name = self.control[which]
 
            if name.startswith("button-"):
 
                value = value > 0
 
            self.lastValue[name] = value
 
            self.valueIn(name, value)
 
        else:
 
            print "unknown control %s to %s" % (which, value)
 
            print("unknown control %s to %s" % (which, value))
 

	
 
    def reopen(self):
 
        if self.dev is not None:
 
            try:
 
                self.dev.close()
 
            except IOError:
 
@@ -80,13 +81,13 @@ class BCF2000(object):
 
        self.dev = open(self.devPath, "r+")
 
        twisted.internet.fdesc.setNonBlocking(self.dev)
 
                    
 
    def valueIn(self, name, value):
 
        """override this with your handler for when events come in
 
        from the hardware"""
 
        print "slider %s to %s" % (name, value)
 
        print("slider %s to %s" % (name, value))
 
        if name == 'slider1':
 
            for x in range(2,8+1):
 
                v2 = int(64 + 64 * math.sin(x / 3 + value / 10))
 
                self.valueOut('slider%d' % x, v2)
 
            for x in range(1,8+1):
 
                self.valueOut('button-upper%s' % x, value > x*15)
 
@@ -98,13 +99,13 @@ class BCF2000(object):
 
            return
 

	
 
        value = int(value)
 
        if self.lastValue.get(name) == value:
 
            return
 
        self.lastValue[name] = value
 
        which = [k for k,v in self.control.items() if v == name]
 
        which = [k for k,v in list(self.control.items()) if v == name]
 
        assert len(which) == 1, "unknown control name %r" % name
 
        if name.startswith('button-'):
 
            value = value * 127
 
        #print "bcf: write %s %s" % (name, value)
 
        self.dev.write(chr(0xb0) + chr(which[0]) + chr(int(value)))
 

	
bin/keyboardcomposer
Show inline comments
 
@@ -6,12 +6,13 @@ from optparse import OptionParser
 
import webcolors, colorsys
 
from louie import dispatcher
 
from twisted.internet import reactor, tksupport
 
from twisted.web import resource
 
from rdflib import URIRef, Literal
 
import tkinter.tix as tk
 
from typing import Dict, Tuple, List
 

	
 
from light9.Fadable import Fadable
 
from light9.subclient import SubClient
 
from light9 import showconfig, networking, prof
 
from light9.uihelpers import toplevelat
 
from light9.namespaces import L9, RDF, RDFS
 
@@ -81,13 +82,13 @@ class SubmasterBox(tk.Frame):
 

	
 
    def __init__(self, master, graph, sub, session, col, row):
 
        self.graph = graph
 
        self.sub = sub
 
        self.session = session
 
        self.col, self.row = col, row
 
        bg = self.graph.value(sub, L9.color, default='#000000')
 
        bg = self.graph.value(sub, L9['color'], default='#000000')
 
        rgb = webcolors.hex_to_rgb(bg)
 
        hsv = colorsys.rgb_to_hsv(*[x / 255 for x in rgb])
 
        darkBg = webcolors.rgb_to_hex(
 
            tuple([
 
                int(x * 255) for x in colorsys.hsv_to_rgb(hsv[0], hsv[1], .2)
 
            ]))
 
@@ -183,15 +184,15 @@ class KeyboardComposer(tk.Frame, SubClie
 
    def __init__(self, root, graph, session, hw_sliders=True):
 
        tk.Frame.__init__(self, root, bg='black')
 
        SubClient.__init__(self)
 
        self.graph = graph
 
        self.session = session
 

	
 
        self.subbox = {}  # sub uri : SubmasterBox
 
        self.slider_table = {}  # coords : SubmasterBox
 
        self.rows = []  # this holds Tk Frames for each row
 
        self.subbox: Dict[URIRef, SubmasterBox] = {}  # sub uri : SubmasterBox
 
        self.slider_table: Dict[Tuple[int, int], SubmasterBox] = {}  # coords : SubmasterBox
 
        self.rows: List[tk.Frame] = []  # this holds Tk Frames for each row
 

	
 
        self.current_row = 0  # should come from session graph
 

	
 
        self.use_hw_sliders = hw_sliders
 
        self.connect_to_hw(hw_sliders)
 

	
 
@@ -267,13 +268,13 @@ class KeyboardComposer(tk.Frame, SubClie
 
                               self.graph.value(effect, L9['order']),
 
                               self.graph.label(effect), effect))
 
        withgroups.sort()
 

	
 
        log.info("withgroups %s", withgroups)
 

	
 
        self.effectEval = {}
 
        self.effectEval: Dict[URIRef, light9.effect.effecteval.EffectEval] = {}
 
        imp.reload(light9.effect.effecteval)
 
        simpleOutputs = SimpleOutputs(self.graph)
 
        for group, order, sortLabel, effect in withgroups:
 
            if col == 0 or group != last_group:
 
                row = self.make_row(group)
 
                rowcount += 1
bin/rdfdb
Show inline comments
 
#!bin/python
 
import run_local  # noqa
 
import os
 
from rdflib import URIRef
 
from light9 import networking, showconfig
 
import rdfdb.service
 

	
 
rdfdb.service.main(
 
    dirUriMap={
 
        os.environ['LIGHT9_SHOW'].rstrip('/') + '/': showconfig.showUri() + '/'
 
        os.environ['LIGHT9_SHOW'].encode('ascii').rstrip(b'/') + b'/':
 
        URIRef(showconfig.showUri() + '/')
 
    },
 
    prefixes={
 
        'show': showconfig.showUri() + '/',
 
        '': 'http://light9.bigasterisk.com/',
 
        'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
 
        'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
 
        'xsd': 'http://www.w3.org/2001/XMLSchema#',
 
        'effect': 'http://light9.bigasterisk.com/effect/',
 
        'dev': 'http://light9.bigasterisk.com/device/',
 
        'show': URIRef(showconfig.showUri() + '/'),
 
        '': URIRef('http://light9.bigasterisk.com/'),
 
        'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'),
 
        'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'),
 
        'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'),
 
        'effect': URIRef('http://light9.bigasterisk.com/effect/'),
 
        'dev': URIRef('http://light9.bigasterisk.com/device/'),
 
    },
 
    port=networking.rdfdb.port,
 
)
light9/collector/collector_client.py
Show inline comments
 
@@ -8,13 +8,12 @@ import treq
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
light9/effect/effecteval.py
Show inline comments
 
from rdflib import Literal
 
from rdflib import Literal, URIRef
 
from light9.namespaces import L9, DEV
 
from webcolors import rgb_to_hex, hex_to_rgb
 
from colorsys import hsv_to_rgb
 
import math
 
from noise import pnoise1
 
import logging
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.scale import scale
 
from typing import Dict, Tuple, Any
 
import random
 
random.seed(0)
 
print("reload effecteval")
 

	
 
log = logging.getLogger('effecteval')
 

	
 
@@ -73,13 +74,13 @@ class EffectEval(object):
 

	
 
        strength = float(effectSettings[L9['strength']])
 
        if strength <= 0:
 
            return DeviceSettings(self.graph, []), {'zero': True}
 

	
 
        report = {}
 
        out = {}  # (dev, attr): value
 
        out: Dict[Tuple[URIRef, URIRef], Any] = {}  # (dev, attr): value
 

	
 
        out.update(
 
            self.simpleOutputs.values(
 
                self.effect, strength,
 
                effectSettings.get(L9['colorScale'], None)))
 

	
light9/effect/sequencer.py
Show inline comments
 
@@ -8,18 +8,20 @@ from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from typing import Any, Callable, Dict, List, Tuple
 

	
 
from light9.namespaces import L9, RDF
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection(
 
@@ -35,29 +37,29 @@ class Note(object):
 

	
 
    def __init__(self, graph, uri, effectevalModule, simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings = {}  # {effectAttr: value}
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
            settingValues = dict(g.predicate_objects(s))
 
            ea = settingValues[L9['effectAttr']]
 
            self.baseEffectSettings[ea] = settingValues[L9['value']]
 

	
 
        def floatVal(s, p):
 
            return float(g.value(s, p).toPython())
 

	
 
        originTime = floatVal(uri, L9['originTime'])
 
        self.points = []
 
        self.points: List[Tuple[float, float]] = []
 
        for curve in g.objects(uri, L9['curve']):
 
            self.points.extend(
 
                self.getCurvePoints(curve, L9['strength'], originTime))
 
        self.points.sort()
 

	
 
    def getCurvePoints(self, curve, attr, originTime):
 
    def getCurvePoints(self, curve, attr, originTime) -> List[Tuple[float, float]]:
 
        points = []
 
        po = list(self.graph.predicate_objects(curve))
 
        if dict(po).get(L9['attr'], None) != attr:
 
            return []
 
        for point in [row[1] for row in po if row[0] == L9['point']]:
 
            po2 = dict(self.graph.predicate_objects(point))
 
@@ -126,22 +128,23 @@ class CodeWatcher(object):
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self, graph, sendToCollector, fps=40):
 
    def __init__(self, graph: SyncedGraph, sendToCollector: Callable[[DeviceSettings], None],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes = []
 
        self.lastStatLog = 0
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes = {}  # song: [notes]
 
        self.notes: Dict[URIRef, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 
@@ -163,13 +166,13 @@ class Sequencer(object):
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            self.notes[song].append(
 
                Note(self.graph, note, effecteval, self.simpleOutputs))
 
        log.info('  compile %s took %.2f ms', song, 1000 * (time.time() - t1))
 

	
 
    def updateLoop(self):
 
    def updateLoop(self) -> None:
 
        # print "updateLoop"
 
        now = time.time()
 
        self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now]
 
        stats.recentFps = len(self.recentUpdateTimes) / (
 
            self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
 
        if now > self.lastStatLog + .2:
 
@@ -229,13 +232,13 @@ class Sequencer(object):
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs):
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state = {}
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update):
 
        self.state.update(update)
 

	
light9/effect/settings.py
Show inline comments
 
@@ -7,26 +7,27 @@ import decimal
 
import numpy
 
from rdflib import URIRef, Literal
 
from light9.namespaces import RDF, L9
 
import logging
 
log = logging.getLogger('settings')
 
from light9.collector.device import resolve
 

	
 
from typing import Sequence, Dict, Union, List
 

	
 
def parseHex(h):
 
    if h[0] != '#': raise ValueError(h)
 
    return [int(h[i:i + 2], 16) for i in (1, 3, 5)]
 

	
 

	
 
def parseHexNorm(h):
 
    return [x / 255 for x in parseHex(h)]
 

	
 

	
 
def toHex(rgbFloat):
 
    return '#%02x%02x%02x' % tuple(
 
        max(0, min(255, int(v * 255))) for v in rgbFloat)
 
def toHex(rgbFloat: Sequence[float]) -> str:
 
    assert len(rgbFloat) == 3
 
    scaled = (max(0, min(255, int(v * 255))) for v in rgbFloat)
 
    return '#%02x%02x%02x' % tuple(scaled) # type: ignore
 

	
 

	
 
def getVal(graph, subj):
 
    lit = graph.value(subj, L9['value']) or graph.value(subj, L9['scaledValue'])
 
    ret = lit.toPython()
 
    if isinstance(ret, decimal.Decimal):
 
@@ -39,13 +40,13 @@ class _Settings(object):
 
    default values are 0 or '#000000'. Internal rep must not store zeros or some
 
    comparisons will break.
 
    """
 

	
 
    def __init__(self, graph, settingsList):
 
        self.graph = graph  # for looking up all possible attrs
 
        self._compiled = {}  # dev: { attr: val }; val is number or colorhex
 
        self._compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}  # dev: { attr: val }; val is number or colorhex
 
        for row in settingsList:
 
            self._compiled.setdefault(row[0], {})[row[1]] = row[2]
 
        # self._compiled may not be final yet- see _fromCompiled
 
        self._delZeros()
 

	
 
    @classmethod
 
@@ -65,13 +66,13 @@ class _Settings(object):
 
                v = getVal(g, s)
 
                settingsList.append((d, da, v))
 
        return cls(graph, settingsList)
 

	
 
    @classmethod
 
    def fromVector(cls, graph, vector, deviceAttrFilter=None):
 
        compiled = {}
 
        compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}
 
        i = 0
 
        for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
 
            if a == L9['color']:
 
                v = toHex(vector[i:i + 3])
 
                i += 3
 
            else:
 
@@ -183,13 +184,13 @@ class _Settings(object):
 
        return out
 

	
 
    def devices(self):
 
        return list(self._compiled.keys())
 

	
 
    def toVector(self, deviceAttrFilter=None):
 
        out = []
 
        out: List[float] = []
 
        for dev, attr in self._vectorKeys(deviceAttrFilter):
 
            v = self.getValue(dev, attr)
 
            if attr == L9['color']:
 
                out.extend(parseHexNorm(v))
 
            else:
 
                out.append(v)
light9/effect/simple_outputs.py
Show inline comments
 
import traceback
 
from light9.namespaces import L9, RDF
 
from light9.effect.scale import scale
 

	
 
from typing import Dict, List, Tuple, Any
 
from rdflib import URIRef
 

	
 
class SimpleOutputs(object):
 

	
 
    def __init__(self, graph):
 
        self.graph = graph
 

	
 
        # effect : [(dev, attr, value, isScaled)]
 
        self.effectOutputs = {}
 
        self.effectOutputs: Dict[URIRef, List[Tuple[URIRef, URIRef, Any, bool]]] = {}
 

	
 
        self.graph.addHandler(self.updateEffectsFromGraph)
 

	
 
    def updateEffectsFromGraph(self):
 
        for effect in self.graph.subjects(RDF.type, L9['Effect']):
 
            settings = []
light9/namespaces.py
Show inline comments
 
from rdflib import Namespace, RDF, RDFS  # noqa
 
from typing import Dict
 

	
 

	
 
# Namespace was showing up in profiles
 
class FastNs(object):
 

	
 
    def __init__(self, base):
 
        self.ns = Namespace(base)
 
        self.cache = {}
 
        self.cache: Dict[str, Namespace] = {}
 

	
 
    def __getitem__(self, term):
 
        if term not in self.cache:
 
            self.cache[term] = self.ns[term]
 
        return self.cache[term]
 

	
light9/networking.py
Show inline comments
 
from urllib.parse import urlparse
 
from urllib.parse import splitport
 
from .showconfig import getGraph, showUri
 
from .namespaces import L9
 

	
 

	
 
class ServiceAddress(object):
 

	
 
@@ -17,21 +16,17 @@ class ServiceAddress(object):
 
            raise ValueError("no url for %s -> %s -> %s" %
 
                             (showUri(), L9['networking'], self.service))
 
        return str(ret)
 

	
 
    @property
 
    def port(self):
 
        _, netloc, _, _, _, _ = urlparse(self._url())
 
        host, port = splitport(netloc)
 
        return int(port)
 
        return urlparse(self._url()).port
 

	
 
    @property
 
    def host(self):
 
        _, netloc, _, _, _, _ = urlparse(self._url())
 
        host, port = splitport(netloc)
 
        return host
 
        return urlparse(self._url()).hostname
 

	
 
    @property
 
    def url(self):
 
        return self._url()
 

	
 
    value = url
light9/prof.py
Show inline comments
 
import sys, traceback, time, logging
 
from typing import Any, Dict
 
log = logging.getLogger()
 

	
 

	
 
def run(main, profile=None):
 
    if not profile:
 
        main()
 
@@ -27,13 +28,13 @@ def watchPoint(filename, lineno, event="
 
    """whenever we hit this line, print a stack trace. event='call'
 
    for lines that are function definitions, like what a profiler
 
    gives you.
 

	
 
    Switch to 'line' to match lines inside functions. Execution speed
 
    will be much slower."""
 
    seenTraces = {}  # trace contents : count
 
    seenTraces: Dict[Any, int] = {}  # trace contents : count
 

	
 
    def trace(frame, ev, arg):
 
        if ev == event:
 
            if (frame.f_code.co_filename, frame.f_lineno) == (filename, lineno):
 
                stack = ''.join(traceback.format_stack(frame))
 
                if stack not in seenTraces:
light9/showconfig.py
Show inline comments
 
import logging, warnings
 
from twisted.python.filepath import FilePath
 
from os import path, getenv
 
from rdflib import Graph
 
from rdflib import URIRef
 
from rdflib import URIRef, Literal
 
from .namespaces import L9
 
from typing import List, cast
 
log = logging.getLogger('showconfig')
 

	
 
_config = None  # graph
 

	
 

	
 
def getGraph():
 
def getGraph() -> Graph:
 
    warnings.warn(
 
        "code that's using showconfig.getGraph should be "
 
        "converted to use the sync graph",
 
        stacklevel=2)
 
    global _config
 
    if _config is None:
 
@@ -24,70 +25,72 @@ def getGraph():
 
                root()).globChildren("build/*.n3"):
 
            graph.parse(location=f.path, format='n3')
 
        _config = graph
 
    return _config
 

	
 

	
 
def root():
 
def root() -> bytes:
 
    r = getenv("LIGHT9_SHOW")
 
    if r is None:
 
        raise OSError(
 
            "LIGHT9_SHOW env variable has not been set to the show root")
 
    return r
 
    return r.encode('ascii')
 

	
 

	
 
_showUri = None
 

	
 

	
 
def showUri():
 
def showUri() -> URIRef:
 
    """Return the show URI associated with $LIGHT9_SHOW."""
 
    global _showUri
 
    if _showUri is None:
 
        _showUri = URIRef(open(path.join(root(), 'URI')).read().strip())
 
        _showUri = URIRef(open(path.join(root(), b'URI')).read().strip())
 
    return _showUri
 

	
 

	
 
def songOnDisk(song):
 
def songOnDisk(song: URIRef) -> bytes:
 
    """given a song URI, where's the on-disk file that mpd would read?"""
 
    graph = getGraph()
 
    root = graph.value(showUri(), L9['musicRoot'])
 
    if not root:
 
        raise ValueError("%s has no :musicRoot" % showUri())
 

	
 
    name = graph.value(song, L9['songFilename'])
 
    if not name:
 
        raise ValueError("Song %r has no :songFilename" % song)
 

	
 
    return path.abspath(path.join(root, name))
 
    return path.abspath(path.join(
 
        cast(Literal, root).toPython(),
 
        cast(Literal, name).toPython()))
 

	
 

	
 
def songFilenameFromURI(uri):
 
def songFilenameFromURI(uri: URIRef) -> bytes:
 
    """
 
    'http://light9.bigasterisk.com/show/dance2007/song8' -> 'song8'
 

	
 
    everything that uses this should be deprecated for real URIs
 
    everywhere"""
 
    assert isinstance(uri, URIRef)
 
    return uri.split('/')[-1]
 
    return str(uri).split('/')[-1].encode('ascii')
 

	
 

	
 
def getSongsFromShow(graph, show):
 
def getSongsFromShow(graph: Graph, show: URIRef) -> List[URIRef]:
 
    playList = graph.value(show, L9['playList'])
 
    if not playList:
 
        raise ValueError("%r has no l9:playList" % show)
 
    # The patch in https://github.com/RDFLib/rdflib/issues/305 fixed a
 
    # serious bug here.
 
    songs = list(graph.items(playList))
 

	
 
    return songs
 

	
 

	
 
def curvesDir():
 
    return path.join(root(), "curves")
 
    return path.join(root(), b"curves")
 

	
 

	
 
def subFile(subname):
 
    return path.join(root(), "subs", subname)
 
    return path.join(root(), b"subs", subname)
 

	
 

	
 
def subsDir():
 
    return path.join(root(), 'subs')
 
    return path.join(root(), b'subs')
light9/tkdnd.py
Show inline comments
 
from glob import glob
 
from os.path import join, basename
 

	
 
from typing import Dict, Any
 

	
 
class TkdndEvent(object):
 
    """
 
    see http://www.ellogon.org/petasis/tcltk-projects/tkdnd/tkdnd-man-page
 
    for details on the fields
 

	
 
@@ -42,13 +42,13 @@ class TkdndEvent(object):
 

	
 

	
 
class Hover(object):
 

	
 
    def __init__(self, widget, style):
 
        self.widget, self.style = widget, style
 
        self.oldStyle = {}
 
        self.oldStyle: Dict[Any, Any] = {}
 

	
 
    def set(self, ev):
 
        for k, v in list(self.style.items()):
 
            self.oldStyle[k] = self.widget.cget(k)
 
        self.widget.configure(**self.style)
 
        return ev.action
light9/uihelpers.py
Show inline comments
 
@@ -3,12 +3,13 @@
 
#from Tkinter import Button
 
import logging, time
 
from rdflib import Literal
 
from tkinter.tix import Button, Toplevel, Tk, IntVar, Entry, DoubleVar
 
import tkinter
 
from light9.namespaces import L9
 
from typing import Dict
 

	
 
log = logging.getLogger("toplevel")
 

	
 
windowlocations = {
 
    'sub': '425x738+00+00',
 
    'console': '168x24+848+000',
 
@@ -41,22 +42,22 @@ def toplevel_savegeometry(tl, name):
 
def toplevelat(name, existingtoplevel=None, graph=None, session=None):
 
    tl = existingtoplevel or Toplevel()
 
    tl.title(name)
 

	
 
    lastSaved = [None]
 
    setOnce = [False]
 
    graphSetTime = [0]
 
    graphSetTime = [0.0]
 

	
 
    def setPosFromGraphOnce():
 
        """
 
        the graph is probably initially empty, but as soon as it gives
 
        us one window position, we stop reading them
 
        """
 
        if setOnce[0]:
 
            return
 
        geo = graph.value(session, L9.windowGeometry)
 
        geo = graph.value(session, L9['windowGeometry'])
 
        log.debug("setPosFromGraphOnce %s", geo)
 

	
 
        setOnce[0] = True
 
        graphSetTime[0] = time.time()
 
        if geo is not None and geo != lastSaved[0]:
 
            tl.geometry(geo)
 
@@ -73,13 +74,13 @@ def toplevelat(name, existingtoplevel=No
 
        if graphSetTime[0] == 0 or time.time() < graphSetTime[0] + 3:
 
            return
 
        if not setOnce[0]:
 
            return
 
        lastSaved[0] = geo
 
        log.debug("saving position %s", geo)
 
        graph.patchObject(session, session, L9.windowGeometry, Literal(geo))
 
        graph.patchObject(session, session, L9['windowGeometry'], Literal(geo))
 

	
 
    if graph is not None and session is not None:
 
        graph.addHandler(setPosFromGraphOnce)
 

	
 
    if name in windowlocations:
 
        tl.geometry(positionOnCurrentDesktop(windowlocations[name]))
 
@@ -139,21 +140,21 @@ def colorlabel(label):
 
    """color a label based on its own text"""
 
    txt = label['text'] or "0"
 
    lev = float(txt) / 100
 
    low = (80, 80, 180)
 
    high = (255, 55, 0o50)
 
    out = [int(l + lev * (h - l)) for h, l in zip(high, low)]
 
    col = "#%02X%02X%02X" % tuple(out)
 
    col = "#%02X%02X%02X" % tuple(out) # type: ignore
 
    label.config(bg=col)
 

	
 

	
 
# TODO: get everyone to use this
 
def colorfade(low, high, percent):
 
    '''not foolproof.  make sure 0 < percent < 1'''
 
    out = [int(l + percent * (h - l)) for h, l in zip(high, low)]
 
    col = "#%02X%02X%02X" % tuple(out)
 
    col = "#%02X%02X%02X" % tuple(out) # type: ignore
 
    return col
 

	
 

	
 
def colortotuple(anytkobj, colorname):
 
    'pass any tk object and a color name, like "yellow"'
 
    rgb = anytkobj.winfo_rgb(colorname)
 
@@ -213,14 +214,14 @@ class Togglebutton(Button):
 

	
 

	
 
class FancyDoubleVar(DoubleVar):
 

	
 
    def __init__(self, master=None):
 
        DoubleVar.__init__(self, master)
 
        self.callbacklist = {}  # cbname : mode
 
        self.namedtraces = {}  # name : cbname
 
        self.callbacklist: Dict[str, str] = {}  # cbname : mode
 
        self.namedtraces: Dict[str, str] = {}  # name : cbname
 

	
 
    def trace_variable(self, mode, callback):
 
        """Define a trace callback for the variable.
 

	
 
        MODE is one of "r", "w", "u" for read, write, undefine.
 
        CALLBACK must be a function which is called when
light9/vidref/musictime.py
Show inline comments
 
import time, json, logging
 
from light9 import networking
 
from twisted.internet import reactor
 
from cyclone.httpclient import fetch
 
from typing import Dict
 
log = logging.getLogger()
 

	
 

	
 
class MusicTime(object):
 
    """
 
    fetch times from ascoltami in a background thread; return times
 
@@ -25,13 +26,13 @@ class MusicTime(object):
 
        end of a song)
 
        """
 
        self.period = period
 
        self.hoverPeriod = .05
 
        self.onChange = onChange
 

	
 
        self.position = {}
 
        self.position: Dict[str, float] = {}
 
        # driven by our pollCurvecalcTime and also by Gui.incomingTime
 
        self.lastHoverTime = None  # None means "no recent value"
 
        self.pollMusicTime()
 
        if pollCurvecalc:
 
            self.pollCurvecalcTime()
 

	
 
@@ -122,9 +123,9 @@ class MusicTime(object):
 

	
 
    def sendTime(self, t):
 
        """request that the player go to this time"""
 
        fetch(
 
            method=b'POST',
 
            url=networking.musicPlayer.path('time'),
 
            body=json.dumps({"t": t}),
 
            postdata=json.dumps({"t": t}).encode('utf8'),
 
            headers={b"content-type": [b"application/json"]},
 
        )
light9/web/websocket.js
Show inline comments
 
/*
 
  url is now relative to the window location
 
  url is now relative to the window location. Note that nginx may drop
 
  the connection after 60sec of inactivity.
 
*/
 
function reconnectingWebSocket(url, onMessage) {
 
    var pong = 0;
 
    
 
    var fullUrl = (
 
        "ws://"
requirements.txt
Show inline comments
 
@@ -28,12 +28,13 @@ yapf==0.27.0
 

	
 
coverage==4.3.4
 
ipdb==0.10.2
 
ipython==5.3.0
 
mypy==0.701
 
flake8
 
typing_extensions
 

	
 
git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
 
git+http://github.com/11craft/louie.git@f18bb71010c114eca9c6b88c96453340e3b39454#egg=louie
 
git+http://github.com/webpy/webpy@ace0f8954c28311004b33c7a513c6e40a3c02470#egg=web
 
https://github.com/drewp/cyclone/archive/python3.zip#egg=cyclone
 

	
tasks.py
Show inline comments
 
@@ -27,24 +27,27 @@ bin_sources = [
 
    ]
 
def pkg_sources():
 
    return glob.glob('light9/**/*.py', recursive=True)
 

	
 
@task
 
def mypy(ctx):
 
    print('\n\n')
 
    def run(sources):
 
        ss = ' '.join(sources)
 
        ctx.run(f'MYPYPATH=stubs env/bin/mypy --check-untyped-defs {ss}',
 
        ctx.run(f'MYPYPATH=stubs:/my/proj/rdfdb env/bin/mypy --check-untyped-defs {ss}',
 
                pty=True, warn=True)
 

	
 
    sources = ' '.join(bin_sources + pkg_sources())
 
    ctx.run(f'env/bin/flake8 --ignore=E115,E123,E124,E126,E225,E231,E261,E262,E265,E301,E302,E303,E305,E306,E401,E402,E501,E701,E731,W291,W293,W391,W504 {sources}', warn=True)
 

	
 
    sources = ' '.join(pkg_sources())
 
    for src in bin_sources:
 
        print(f"mypy {src}")
 
        run([src])# + pkg_sources())
 
    run(['bin/rdfdb'])
 
    run(['bin/keyboardcomposer'])
 
    #for src in bin_sources:
 
    #    print(f"mypy {src}")
 
    #    run([src])# + pkg_sources())
 
@task
 
def reformat(ctx):
 
    ctx.run("env/bin/yapf --verbose --parallel --in-place --style google light9/**/*.py `file --no-pad  bin/* | grep 'Python script' | perl -lpe 's/:.*//'`")
 
    
 
@task
 
def test(ctx):
0 comments (0 inline, 0 general)