Files @ 04612ba3fe45
Branch filter:

Location: light9/light9/effect/sequencer/eval_faders.py

drewp@bigasterisk.com
refactor

import asyncio
import logging
import time
from typing import Callable, Coroutine, List, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9.effect import effecteval
from light9.effect.sequencer import Note
from light9.effect.settings import DeviceSettings
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
from light9.newtypes import NoteUri

log = logging.getLogger('sequencer')

class FaderEval:
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
    
    The current faders become Notes in here, for more code reuse.
    """
    def __init__(self,
                 graph: SyncedGraph,
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
                 ):
        self.graph = graph
        self.sendToCollector = sendToCollector

        # Notes without times- always on
        self.notes: List[Note] = []

        self.simpleOutputs = SimpleOutputs(self.graph)
        self.graph.addHandler(self.compileGraph)
        self.lastLoopSucceeded = False

        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
        log.info('startupdating task')
        asyncio.create_task(self.startUpdating())

    async def startUpdating(self):
        await self.graph.addAsyncHandler(self.update)
        log.info('startupdating task done')

    def onCodeChange(self):
        log.debug('seq.onCodeChange')
        self.graph.addHandler(self.compileGraph)
        #self.updateLoop()

    @metrics('compile_graph_fader').time()
    def compileGraph(self) -> None:
        """rebuild our data from the graph"""
        self.notes = []
        for fader in self.graph.subjects(RDF.type, L9['Fader']):          
            def compileFader() -> Note:
                return self.compileFader(cast(URIRef, fader))

            self.notes.append(compileFader())
        if self.notes:
            asyncio.create_task(self.startUpdating())


    @metrics('compile_fader').time()
    def compileFader(self, fader: URIRef) -> Note:
        return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
                self.simpleOutputs, timed=False)
    
    @metrics('update_call_fader').time()
    async def update(self):
        settings = []
        for note in self.notes:
            effectValue = self.graph.value(note.uri, L9['value'])
            if effectValue is None:
                log.info(f'skip note {note}, no :value')
                continue
            s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
            settings.append(s)
        devSettings = DeviceSettings.fromList(self.graph, settings)
        with metrics('update_s3_send_fader').time():  # our measurement
            sendSecs = await self.sendToCollector(devSettings)