import asyncio
import logging
import time
from typing import Callable, Coroutine, List, cast
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef
from light9.effect import effecteval
from light9.effect.sequencer import Note
from light9.effect.settings import DeviceSettings
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
from light9.newtypes import NoteUri
log = logging.getLogger('sequencer')
class FaderEval:
"""peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
The current faders become Notes in here, for more code reuse.
"""
def __init__(self,
graph: SyncedGraph,
sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
):
self.graph = graph
self.sendToCollector = sendToCollector
# Notes without times- always on
self.notes: List[Note] = []
self.simpleOutputs = SimpleOutputs(self.graph)
self.graph.addHandler(self.compileGraph)
self.lastLoopSucceeded = False
# self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
log.info('startupdating task')
asyncio.create_task(self.startUpdating())
async def startUpdating(self):
await self.graph.addAsyncHandler(self.update)
log.info('startupdating task done')
def onCodeChange(self):
log.debug('seq.onCodeChange')
self.graph.addHandler(self.compileGraph)
#self.updateLoop()
@metrics('compile_graph_fader').time()
def compileGraph(self) -> None:
"""rebuild our data from the graph"""
self.notes = []
for fader in self.graph.subjects(RDF.type, L9['Fader']):
def compileFader() -> Note:
return self.compileFader(cast(URIRef, fader))
self.notes.append(compileFader())
if self.notes:
asyncio.create_task(self.startUpdating())
@metrics('compile_fader').time()
def compileFader(self, fader: URIRef) -> Note:
return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
self.simpleOutputs, timed=False)
@metrics('update_call_fader').time()
async def update(self):
settings = []
for note in self.notes:
effectValue = self.graph.value(note.uri, L9['value'])
if effectValue is None:
log.info(f'skip note {note}, no :value')
continue
s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
settings.append(s)
devSettings = DeviceSettings.fromList(self.graph, settings)
with metrics('update_s3_send_fader').time(): # our measurement
sendSecs = await self.sendToCollector(devSettings)