Files @ 9fc653ee7fff
Branch filter:

Location: light9/light9/effect/sequencer/sequencer.py

drewp@bigasterisk.com
WIP redoing how Note works. The new Note outputs EffectSettings only,
and callers have to effect_eval them. Still not sure what SimpleOutputs does.
'''
copies from effectloop.py, which this should replace
'''

import asyncio
import imp
import logging
import time
import traceback
from typing import Callable, Coroutine, Dict, List, cast

from louie import All, dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef
from twisted.internet import reactor
from twisted.internet.inotify import INotify
from twisted.python.filepath import FilePath

from light9.ascoltami.musictime_client import MusicTime
from light9.effect import effecteval
from light9.effect.sequencer import Note
from light9.effect.settings import DeviceSettings
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
from light9.newtypes import NoteUri, Song

log = logging.getLogger('sequencer')


class StateUpdate(All):
    pass


class CodeWatcher:

    def __init__(self, onChange):
        self.onChange = onChange

        self.notifier = INotify()
        self.notifier.startReading()
        self.notifier.watch(FilePath(effecteval.__file__.replace('.pyc',
                                                                 '.py')),
                            callbacks=[self.codeChange])

    def codeChange(self, watch, path, mask):

        def go():
            log.info("reload effecteval")
            imp.reload(effecteval)
            self.onChange()

        # in case we got an event at the start of the write
        reactor.callLater(.1, go) # type: ignore


class Sequencer:
    """Notes from the graph + current song playback -> sendToCollector"""
    def __init__(self,
                 graph: SyncedGraph,
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
                 fps=40,
                 ):
        self.graph = graph
        self.fps = fps
        metrics('update_loop_goal_fps').set(self.fps)
        metrics('update_loop_goal_latency').set(1 / self.fps)
        self.sendToCollector = sendToCollector
        self.music = MusicTime(period=.2)

        self.recentUpdateTimes: List[float] = []
        self.lastStatLog = 0.0
        self._compileGraphCall = None
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
        self.simpleOutputs = SimpleOutputs(self.graph)
        self.graph.addHandler(self.compileGraph)
        self.lastLoopSucceeded = False

        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
        asyncio.create_task(self.updateLoop())

    def onCodeChange(self):
        log.debug('seq.onCodeChange')
        self.graph.addHandler(self.compileGraph)
        #self.updateLoop()

    @metrics('compile_graph').time()
    def compileGraph(self) -> None:
        """rebuild our data from the graph"""
        for song in self.graph.subjects(RDF.type, L9['Song']):

            def compileSong(song: Song = cast(Song, song)) -> None:
                self.compileSong(song)

            self.graph.addHandler(compileSong)

    @metrics('compile_song').time()
    def compileSong(self, song: Song) -> None:
        anyErrors = False
        self.notes[song] = []
        for note in self.graph.objects(song, L9['note']):
            try:
                n = Note(self.graph, NoteUri(cast(NoteUri, note)))
            except Exception:
                log.warn(f"failed to build Note {note} - skipping")
                anyErrors = True
                continue
            self.notes[song].append(n)
        if not anyErrors:
            log.info(f'built all notes for {song}')

    async def updateLoop(self):
        while True:
            frameStart = time.time()
            try:
                sec = await self.update()
            except Exception as e:
                self.lastLoopSucceeded = False
                traceback.print_exc()
                log.warn('updateLoop: %r', e)
                await asyncio.sleep(1)
                continue
            else:
                took = time.time() - frameStart
                metrics('update_loop_latency').observe(took)

                if not self.lastLoopSucceeded:
                    log.info('Sequencer.update is working')
                    self.lastLoopSucceeded = True

                delay = max(0, 1 / self.fps - took)
                await asyncio.sleep(delay)
                continue

    @metrics('update_call').time()
    async def update(self):
        with metrics('update_s0_getMusic').time():
            musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest()
            if not musicState.get('song') or not isinstance(
                    musicState.get('t'), float):
                return
            song = Song(URIRef(musicState['song']))
            # print('dispsend')
            # import pdb;pdb.set_trace()
            dispatcher.send(StateUpdate,
                            update={
                                'song': str(song),
                                't': musicState['t']
                            })

        with metrics('update_s1_eval').time():
            settings = []
            songNotes = sorted(cast(List[Note], self.notes.get(song, [])), key=lambda n: n.uri)
            noteReports = []
            for note in songNotes:
                try:
                    s, report = note.outputSettings(musicState['t'])
                except Exception:
                    traceback.print_exc()
                    raise
                noteReports.append(report)
                settings.append(s)
            devSettings = DeviceSettings.fromList(self.graph, settings)
        dispatcher.send(StateUpdate, update={'songNotes': noteReports})

        with metrics('update_s3_send').time():  # our measurement
            sendSecs = await self.sendToCollector(devSettings)

        # sendToCollector's own measurement.
        # (sometimes it's None, not sure why, and neither is mypy)
        #if isinstance(sendSecs, float):
        #    metrics('update_s3_send_client').observe(sendSecs)