Changeset - ddc9a5ef213a
[Not reviewed]
default
0 3 0
Drew Perttula - 6 years ago 2019-06-01 20:07:47
drewp@bigasterisk.com
type hints
Ignore-this: 4bcd306b98d68f93a61fdabd53e6eccc
3 files changed with 29 insertions and 24 deletions:
0 comments (0 inline, 0 general)
light9/effect/sequencer.py
Show inline comments
 
'''
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from louie import dispatcher
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from typing import Any, Callable, Dict, List, Tuple
 
from typing import Any, Callable, Dict, List, Tuple, cast
 

	
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection(
 
    '/sequencer/',
 
    scales.PmfStat('update'),
 
    scales.PmfStat('compileGraph'),
 
    scales.PmfStat('compileSong'),
 
    scales.DoubleStat('recentFps'),
 
)
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph, uri, effectevalModule, simpleOutputs):
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
            settingValues = dict(g.predicate_objects(s))
 
            ea = settingValues[L9['effectAttr']]
 
            self.baseEffectSettings[ea] = settingValues[L9['value']]
 

	
 
        def floatVal(s, p):
 
            return float(g.value(s, p).toPython())
 

	
 
        originTime = floatVal(uri, L9['originTime'])
 
        self.points: List[Tuple[float, float]] = []
 
        for curve in g.objects(uri, L9['curve']):
 
            self.points.extend(
 
                self.getCurvePoints(curve, L9['strength'], originTime))
 
        self.points.sort()
 

	
 
    def getCurvePoints(self, curve, attr,
 
                       originTime) -> List[Tuple[float, float]]:
 
    def getCurvePoints(self, curve: Curve, attr,
 
                       originTime: float) -> List[Tuple[float, float]]:
 
        points = []
 
        po = list(self.graph.predicate_objects(curve))
 
        if dict(po).get(L9['attr'], None) != attr:
 
            return []
 
        for point in [row[1] for row in po if row[0] == L9['point']]:
 
            po2 = dict(self.graph.predicate_objects(point))
 
            points.append(
 
                (originTime + float(po2[L9['time']]), float(po2[L9['value']])))
 
        return points
 

	
 
    def activeAt(self, t):
 
    def activeAt(self, t: float) -> bool:
 
        return self.points[0][0] <= t <= self.points[-1][0]
 

	
 
    def evalCurve(self, t):
 
    def evalCurve(self, t: float) -> float:
 
        i = bisect.bisect_left(self.points, (t, None)) - 1
 

	
 
        if i == -1:
 
            return self.points[0][1]
 
        if self.points[i][0] > t:
 
            return self.points[i][1]
 
        if i >= len(self.points) - 1:
 
            return self.points[i][1]
 

	
 
        p1, p2 = self.points[i], self.points[i + 1]
 
        frac = (t - p1[0]) / (p2[0] - p1[0])
 
        y = p1[1] + (p2[1] - p1[1]) * frac
 
        return y
 

	
 
    def outputSettings(self, t):
 
    def outputSettings(self, t: float) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
        """
 
        list of (device, attr, value), and a report for web
 
        """
 
        report = {
 
            'note': str(self.uri),
 
            'effectClass': self.effectEval.effect,
 
        }
 
        effectSettings = self.baseEffectSettings.copy()
 
        effectSettings[L9['strength']] = self.evalCurve(t)
 
        report['effectSettings'] = dict(
 
            (str(k), str(round(v, 4))) for k, v in sorted(effectSettings.items()))
 
        report['nonZero'] = effectSettings[L9['strength']] > 0
 
        out, evalReport = self.effectEval.outputFromEffect(
 
            list(effectSettings.items()),
 
            songTime=t,
 
            # note: not using origin here since it's going away
 
            noteTime=t - self.points[0][0])
 
        report['devicesAffected'] = len(out.devices())
 
        return out, report
 

	
 

	
 
class CodeWatcher(object):
 

	
 
    def __init__(self, onChange):
 
@@ -132,130 +133,129 @@ class CodeWatcher(object):
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], None],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes: Dict[URIRef, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 

	
 
    @stats.compileGraph.time()
 
    def compileGraph(self):
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        t1 = time.time()
 
        g = self.graph
 

	
 
        for song in g.subjects(RDF.type, L9['Song']):
 
            self.graph.addHandler(lambda song=song: self.compileSong(song))
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 
            self.graph.addHandler(compileSong)
 
        log.info('compileGraph took %.2f ms', 1000 * (time.time() - t1))
 

	
 
    @stats.compileSong.time()
 
    def compileSong(self, song):
 
    def compileSong(self, song: Song) -> None:
 
        t1 = time.time()
 

	
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            self.notes[song].append(
 
                Note(self.graph, note, effecteval, self.simpleOutputs))
 
        log.info('  compile %s took %.2f ms', song, 1000 * (time.time() - t1))
 

	
 
    def updateLoop(self) -> None:
 
        # print "updateLoop"
 
        now = time.time()
 
        self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now]
 
        stats.recentFps = len(self.recentUpdateTimes) / (
 
            self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
 
        if now > self.lastStatLog + .2:
 
            dispatcher.send(
 
                'state',
 
                update={
 
                    'recentDeltas':
 
                    sorted([
 
                        round(t1 - t0, 4)
 
                        for t0, t1 in zip(self.recentUpdateTimes[:-1],
 
                                          self.recentUpdateTimes[1:])
 
                    ]),
 
                    'recentFps':
 
                    stats.recentFps
 
                })
 
            self.lastStatLog = now
 

	
 
        def done(sec):
 
        def done(sec: float):
 
            # print "sec", sec
 
            # delay = max(0, time.time() - (now + 1 / self.fps))
 
            # print 'cl', delay
 
            delay = 0.005
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
        def err(e):
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(2, self.updateLoop)
 

	
 
        d = self.update()
 
        d.addCallbacks(done, err)
 

	
 
    @stats.update.time()
 
    def update(self):
 
        # print "update"
 
    def update(self) -> None:
 
        try:
 
            musicState = self.music.getLatest()
 
            song = URIRef(
 
                musicState['song']) if musicState.get('song') else None
 
            if 't' not in musicState:
 
            if 'song' not in musicState or 't' not in musicState:
 
                return defer.succeed(0)
 
            t = musicState['t']
 
            dispatcher.send('state', update={'song': str(song), 't': t})
 
            song = Song(musicState['song'])
 
            dispatcher.send('state', update={'song': str(song), 't': musicState['t']})
 

	
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                s, report = note.outputSettings(t)
 
                s, report = note.outputSettings(musicState['t'])
 
                noteReports.append(report)
 
                settings.append(s)
 
            dispatcher.send('state', update={'songNotes': noteReports})
 
            return self.sendToCollector(
 
                DeviceSettings.fromList(self.graph, settings))
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs):
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update):
 
    def updateState(self, update: Dict):
 
        self.state.update(update)
 

	
 
    def bind(self):
 
    def bind(self) -> None:
 
        self.numConnected += 1
 

	
 
        if self.numConnected == 1:
 
            self.loop()
 

	
 
    def loop(self):
 
    def loop(self) -> None:
 
        if self.numConnected == 0:
 
            return
 
        self.sendEvent(self.state)
 
        reactor.callLater(.1, self.loop)
 

	
 
    def unbind(self):
 
    def unbind(self) -> None:
 
        self.numConnected -= 1
light9/newtypes.py
Show inline comments
 
from typing import Tuple, NewType
 
from rdflib import URIRef
 

	
 
ClientType = NewType('ClientType', str)
 
ClientSessionType = NewType('ClientSessionType', str)
 
Curve = NewType('Curve', URIRef)
 
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
 
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
 
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
 
DmxIndex = NewType('DmxIndex', int)  # 1..512
 
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
 
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
 
NoteUri = NewType('NoteUri', URIRef)
 
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
 
OutputValue = NewType('OutputValue', int)  # byte in dmx message
 
Song = NewType('Song', URIRef)
 
UnixTime = NewType('UnixTime', float)
 

	
 
# Alternate output range for a device. Instead of outputting 0.0 to
 
# 1.0, you can map that range into, say, 0.2 to 0.7
 
OutputRange = NewType('OutputRange', Tuple[float, float])
tasks.py
Show inline comments
 
@@ -22,37 +22,39 @@ bin_sources = [
 
        'bin/subcomposer',
 
        'bin/subserver',
 
        'bin/vidref',
 
        'bin/vidrefsetup',
 
        'bin/wavecurve',
 
    ]
 
def pkg_sources():
 
    return glob.glob('light9/**/*.py', recursive=True)
 

	
 
@task
 
def mypy(ctx):
 
    print('\n\n')
 
    def run(sources):
 
        ss = ' '.join(sources)
 
        ctx.run(f'MYPYPATH=stubs:/my/proj/rdfdb env/bin/mypy --check-untyped-defs {ss}',
 
                pty=True, warn=True)
 

	
 
    sources = ' '.join(bin_sources + pkg_sources())
 
    ctx.run(f'env/bin/flake8 --ignore=E115,E123,E124,E126,E225,E231,E261,E262,E265,E301,E302,E303,E305,E306,E401,E402,E501,E701,E731,W291,W293,W391,W504 {sources}', warn=True)
 

	
 
    sources = ' '.join(pkg_sources())
 
    run(['bin/collector'])
 
    run(['bin/rdfdb'])
 
    run(['bin/keyboardcomposer'])
 
    run(['bin/effectsequencer'])
 
    run(['bin/ascoltami2'])
 
    #for src in bin_sources:
 
    #    print(f"mypy {src}")
 
    #    run([src])# + pkg_sources())
 
@task
 
def reformat(ctx):
 
    ctx.run("env/bin/yapf --verbose --parallel --in-place --style google light9/*.py light9/*/*.py `file --no-pad  bin/* | grep 'Python script' | perl -lpe 's/:.*//'`")
 
    
 
@task
 
def test(ctx):
 
    ctx.run('docker build -f Dockerfile.build -t light9_build:latest .')
 
    ctx.run('docker run --rm -it -v `pwd`:/opt light9_build:latest'
 
            ' nose2 -v light9.currentstategraphapi_test light9.graphfile_test',
 
            pty=True)
0 comments (0 inline, 0 general)