Mercurial > code > home > repos > light9
view light9/effect/sequencer/eval_faders.py @ 2186:04612ba3fe45
refactor
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 May 2023 20:55:28 -0700 |
parents | bf07831a5339 |
children | 9fc653ee7fff |
line wrap: on
line source
import asyncio import logging import time from typing import Callable, Coroutine, List, cast from rdfdb.syncedgraph.syncedgraph import SyncedGraph from rdflib import URIRef from light9.effect import effecteval from light9.effect.sequencer import Note from light9.effect.settings import DeviceSettings from light9.effect.simple_outputs import SimpleOutputs from light9.metrics import metrics from light9.namespaces import L9, RDF from light9.newtypes import NoteUri log = logging.getLogger('sequencer') class FaderEval: """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector The current faders become Notes in here, for more code reuse. """ def __init__(self, graph: SyncedGraph, sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]], ): self.graph = graph self.sendToCollector = sendToCollector # Notes without times- always on self.notes: List[Note] = [] self.simpleOutputs = SimpleOutputs(self.graph) self.graph.addHandler(self.compileGraph) self.lastLoopSucceeded = False # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange) log.info('startupdating task') asyncio.create_task(self.startUpdating()) async def startUpdating(self): await self.graph.addAsyncHandler(self.update) log.info('startupdating task done') def onCodeChange(self): log.debug('seq.onCodeChange') self.graph.addHandler(self.compileGraph) #self.updateLoop() @metrics('compile_graph_fader').time() def compileGraph(self) -> None: """rebuild our data from the graph""" self.notes = [] for fader in self.graph.subjects(RDF.type, L9['Fader']): def compileFader() -> Note: return self.compileFader(cast(URIRef, fader)) self.notes.append(compileFader()) if self.notes: asyncio.create_task(self.startUpdating()) @metrics('compile_fader').time() def compileFader(self, fader: URIRef) -> Note: return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval, self.simpleOutputs, timed=False) @metrics('update_call_fader').time() async def update(self): settings = [] for note in self.notes: effectValue = self.graph.value(note.uri, L9['value']) if effectValue is None: log.info(f'skip note {note}, no :value') continue s, report = note.outputSettings(t=time.time(), strength=float(effectValue)) settings.append(s) devSettings = DeviceSettings.fromList(self.graph, settings) with metrics('update_s3_send_fader').time(): # our measurement sendSecs = await self.sendToCollector(devSettings)