Changeset - ddc9a5ef213a
[Not reviewed]
default
0 3 0
Drew Perttula - 6 years ago 2019-06-01 20:07:47
drewp@bigasterisk.com
type hints
Ignore-this: 4bcd306b98d68f93a61fdabd53e6eccc
3 files changed with 29 insertions and 24 deletions:
0 comments (0 inline, 0 general)
light9/effect/sequencer.py
Show inline comments
 
@@ -8,15 +8,16 @@ from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from typing import Any, Callable, Dict, List, Tuple
 
from typing import Any, Callable, Dict, List, Tuple, cast
 

	
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
@@ -32,13 +33,13 @@ stats = scales.collection(
 
    scales.DoubleStat('recentFps'),
 
)
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph, uri, effectevalModule, simpleOutputs):
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
@@ -53,28 +54,28 @@ class Note(object):
 
        self.points: List[Tuple[float, float]] = []
 
        for curve in g.objects(uri, L9['curve']):
 
            self.points.extend(
 
                self.getCurvePoints(curve, L9['strength'], originTime))
 
        self.points.sort()
 

	
 
    def getCurvePoints(self, curve, attr,
 
                       originTime) -> List[Tuple[float, float]]:
 
    def getCurvePoints(self, curve: Curve, attr,
 
                       originTime: float) -> List[Tuple[float, float]]:
 
        points = []
 
        po = list(self.graph.predicate_objects(curve))
 
        if dict(po).get(L9['attr'], None) != attr:
 
            return []
 
        for point in [row[1] for row in po if row[0] == L9['point']]:
 
            po2 = dict(self.graph.predicate_objects(point))
 
            points.append(
 
                (originTime + float(po2[L9['time']]), float(po2[L9['value']])))
 
        return points
 

	
 
    def activeAt(self, t):
 
    def activeAt(self, t: float) -> bool:
 
        return self.points[0][0] <= t <= self.points[-1][0]
 

	
 
    def evalCurve(self, t):
 
    def evalCurve(self, t: float) -> float:
 
        i = bisect.bisect_left(self.points, (t, None)) - 1
 

	
 
        if i == -1:
 
            return self.points[0][1]
 
        if self.points[i][0] > t:
 
            return self.points[i][1]
 
@@ -83,13 +84,13 @@ class Note(object):
 

	
 
        p1, p2 = self.points[i], self.points[i + 1]
 
        frac = (t - p1[0]) / (p2[0] - p1[0])
 
        y = p1[1] + (p2[1] - p1[1]) * frac
 
        return y
 

	
 
    def outputSettings(self, t):
 
    def outputSettings(self, t: float) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
        """
 
        list of (device, attr, value), and a report for web
 
        """
 
        report = {
 
            'note': str(self.uri),
 
            'effectClass': self.effectEval.effect,
 
@@ -150,23 +151,25 @@ class Sequencer(object):
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 

	
 
    @stats.compileGraph.time()
 
    def compileGraph(self):
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        t1 = time.time()
 
        g = self.graph
 

	
 
        for song in g.subjects(RDF.type, L9['Song']):
 
            self.graph.addHandler(lambda song=song: self.compileSong(song))
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 
            self.graph.addHandler(compileSong)
 
        log.info('compileGraph took %.2f ms', 1000 * (time.time() - t1))
 

	
 
    @stats.compileSong.time()
 
    def compileSong(self, song):
 
    def compileSong(self, song: Song) -> None:
 
        t1 = time.time()
 

	
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            self.notes[song].append(
 
                Note(self.graph, note, effecteval, self.simpleOutputs))
 
@@ -190,13 +193,13 @@ class Sequencer(object):
 
                    ]),
 
                    'recentFps':
 
                    stats.recentFps
 
                })
 
            self.lastStatLog = now
 

	
 
        def done(sec):
 
        def done(sec: float):
 
            # print "sec", sec
 
            # delay = max(0, time.time() - (now + 1 / self.fps))
 
            # print 'cl', delay
 
            delay = 0.005
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
@@ -205,57 +208,54 @@ class Sequencer(object):
 
            reactor.callLater(2, self.updateLoop)
 

	
 
        d = self.update()
 
        d.addCallbacks(done, err)
 

	
 
    @stats.update.time()
 
    def update(self):
 
        # print "update"
 
    def update(self) -> None:
 
        try:
 
            musicState = self.music.getLatest()
 
            song = URIRef(
 
                musicState['song']) if musicState.get('song') else None
 
            if 't' not in musicState:
 
            if 'song' not in musicState or 't' not in musicState:
 
                return defer.succeed(0)
 
            t = musicState['t']
 
            dispatcher.send('state', update={'song': str(song), 't': t})
 
            song = Song(musicState['song'])
 
            dispatcher.send('state', update={'song': str(song), 't': musicState['t']})
 

	
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                s, report = note.outputSettings(t)
 
                s, report = note.outputSettings(musicState['t'])
 
                noteReports.append(report)
 
                settings.append(s)
 
            dispatcher.send('state', update={'songNotes': noteReports})
 
            return self.sendToCollector(
 
                DeviceSettings.fromList(self.graph, settings))
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs):
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update):
 
    def updateState(self, update: Dict):
 
        self.state.update(update)
 

	
 
    def bind(self):
 
    def bind(self) -> None:
 
        self.numConnected += 1
 

	
 
        if self.numConnected == 1:
 
            self.loop()
 

	
 
    def loop(self):
 
    def loop(self) -> None:
 
        if self.numConnected == 0:
 
            return
 
        self.sendEvent(self.state)
 
        reactor.callLater(.1, self.loop)
 

	
 
    def unbind(self):
 
    def unbind(self) -> None:
 
        self.numConnected -= 1
light9/newtypes.py
Show inline comments
 
from typing import Tuple, NewType
 
from rdflib import URIRef
 

	
 
ClientType = NewType('ClientType', str)
 
ClientSessionType = NewType('ClientSessionType', str)
 
Curve = NewType('Curve', URIRef)
 
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
 
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
 
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
 
DmxIndex = NewType('DmxIndex', int)  # 1..512
 
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
 
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
 
NoteUri = NewType('NoteUri', URIRef)
 
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
 
OutputValue = NewType('OutputValue', int)  # byte in dmx message
 
Song = NewType('Song', URIRef)
 
UnixTime = NewType('UnixTime', float)
 

	
 
# Alternate output range for a device. Instead of outputting 0.0 to
 
# 1.0, you can map that range into, say, 0.2 to 0.7
 
OutputRange = NewType('OutputRange', Tuple[float, float])
tasks.py
Show inline comments
 
@@ -40,12 +40,14 @@ def mypy(ctx):
 
    ctx.run(f'env/bin/flake8 --ignore=E115,E123,E124,E126,E225,E231,E261,E262,E265,E301,E302,E303,E305,E306,E401,E402,E501,E701,E731,W291,W293,W391,W504 {sources}', warn=True)
 

	
 
    sources = ' '.join(pkg_sources())
 
    run(['bin/collector'])
 
    run(['bin/rdfdb'])
 
    run(['bin/keyboardcomposer'])
 
    run(['bin/effectsequencer'])
 
    run(['bin/ascoltami2'])
 
    #for src in bin_sources:
 
    #    print(f"mypy {src}")
 
    #    run([src])# + pkg_sources())
 
@task
 
def reformat(ctx):
 
    ctx.run("env/bin/yapf --verbose --parallel --in-place --style google light9/*.py light9/*/*.py `file --no-pad  bin/* | grep 'Python script' | perl -lpe 's/:.*//'`")
0 comments (0 inline, 0 general)