Mercurial > code > home > repos > light9
view light9/effect/sequencer/sequencer.py @ 2357:ccd04278e357
metrics cleanup
author | drewp@bigasterisk.com |
---|---|
date | Sat, 03 Jun 2023 17:15:40 -0700 |
parents | 8c82f13a3298 |
children |
line wrap: on
line source
''' copies from effectloop.py, which this should replace ''' import asyncio import importlib import logging import time import traceback from typing import Callable, Coroutine, Dict, List, cast from louie import All, dispatcher from rdfdb.syncedgraph.syncedgraph import SyncedGraph from rdflib import URIRef from twisted.internet import reactor from twisted.internet.inotify import INotify from twisted.python.filepath import FilePath from light9.ascoltami.musictime_client import MusicTime from light9.effect import effecteval from light9.effect.sequencer import Note from light9.effect.settings import DeviceSettings from light9.metrics import metrics from light9.namespaces import L9, RDF from light9.newtypes import NoteUri, Song log = logging.getLogger('sequencer') class StateUpdate(All): pass class CodeWatcher: def __init__(self, onChange): self.onChange = onChange self.notifier = INotify() self.notifier.startReading() self.notifier.watch(FilePath(effecteval.__file__.replace('.pyc', '.py')), callbacks=[self.codeChange]) def codeChange(self, watch, path, mask): def go(): log.info("reload effecteval") importlib.reload(effecteval) self.onChange() # in case we got an event at the start of the write reactor.callLater(.1, go) # type: ignore class Sequencer: """Notes from the graph + current song playback -> sendToCollector""" def __init__(self, graph: SyncedGraph, sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]], fps=40, ): self.graph = graph self.fps = fps metrics('update_loop_goal_fps').set(self.fps) metrics('update_loop_goal_latency').set(1 / self.fps) self.sendToCollector = sendToCollector self.music = MusicTime(period=.2) self.recentUpdateTimes: List[float] = [] self.lastStatLog = 0.0 self._compileGraphCall = None self.notes: Dict[Song, List[Note]] = {} # song: [notes] self.simpleOutputs = SimpleOutputs(self.graph) self.graph.addHandler(self.compileGraph) self.lastLoopSucceeded = False # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange) asyncio.create_task(self.updateLoop()) def onCodeChange(self): log.debug('seq.onCodeChange') self.graph.addHandler(self.compileGraph) #self.updateLoop() def compileGraph(self) -> None: """rebuild our data from the graph""" for song in self.graph.subjects(RDF.type, L9['Song']): def compileSong(song: Song = cast(Song, song)) -> None: self.compileSong(song) self.graph.addHandler(compileSong) def compileSong(self, song: Song) -> None: anyErrors = False self.notes[song] = [] for note in self.graph.objects(song, L9['note']): try: n = Note(self.graph, NoteUri(cast(NoteUri, note))) except Exception: log.warn(f"failed to build Note {note} - skipping") anyErrors = True continue self.notes[song].append(n) if not anyErrors: log.info(f'built all notes for {song}') async def updateLoop(self): while True: frameStart = time.time() try: sec = await self.update() except Exception as e: self.lastLoopSucceeded = False traceback.print_exc() log.warn('updateLoop: %r', e) await asyncio.sleep(1) continue else: took = time.time() - frameStart metrics('update_loop_latency').observe(took) if not self.lastLoopSucceeded: log.info('Sequencer.update is working') self.lastLoopSucceeded = True delay = max(0, 1 / self.fps - took) await asyncio.sleep(delay) continue async def update(self): with metrics('update_s0_getMusic').time(): musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest() if not musicState.get('song') or not isinstance( musicState.get('t'), float): return song = Song(URIRef(musicState['song'])) # print('dispsend') # import pdb;pdb.set_trace() dispatcher.send(StateUpdate, update={ 'song': str(song), 't': musicState['t'] }) with metrics('update_s1_eval').time(): settings = [] songNotes = sorted(cast(List[Note], self.notes.get(song, [])), key=lambda n: n.uri) noteReports = [] for note in songNotes: try: s, report = note.outputSettings(musicState['t']) except Exception: traceback.print_exc() raise noteReports.append(report) settings.append(s) devSettings = DeviceSettings.fromList(self.graph, settings) dispatcher.send(StateUpdate, update={'songNotes': noteReports}) with metrics('update_s3_send').time(): # our measurement sendSecs = await self.sendToCollector(devSettings) # sendToCollector's own measurement. # (sometimes it's None, not sure why, and neither is mypy) #if isinstance(sendSecs, float): # metrics('update_s3_send_client').observe(sendSecs)