Changeset - bf07831a5339
[Not reviewed]
default
0 5 1
drewp@bigasterisk.com - 20 months ago 2023-05-20 01:20:19
drewp@bigasterisk.com
refactor
6 files changed with 121 insertions and 132 deletions:
0 comments (0 inline, 0 general)
light9/collector/weblisteners.py
Show inline comments
 
@@ -30,26 +30,25 @@ class WebListeners(object):
 
        asyncio.create_task(self.flusher())
 

	
 
    def addClient(self, client: UiListener):
 
        self.clients.append((client, {}))  # seen = {dev: attrs}
 
        log.info('added client %s %s', len(self.clients), client)
 
        # todo: it would be nice to immediately fill in the client on the
 
        # latest settings, but I lost them so I can't.
 

	
 
    def delClient(self, client: UiListener):
 
        self.clients = [(c, t) for c, t in self.clients if c != client]
 
        log.info('delClient %s, %s left', client, len(self.clients))
 

	
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                               DmxMessageIndex]]):
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]):
 
        """called often- don't be slow"""
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
        # maybe put on a stack for flusher or something
 

	
 
    async def flusher(self):
 
        await asyncio.sleep(3)  # help startup faster?
 
        while True:
 
            await self._flush()
 
            await asyncio.sleep(.05)
 

	
 
    async def _flush(self):
 
        now = time.time()
light9/effect/effecteval.py
Show inline comments
 
@@ -11,28 +11,28 @@ from typing import Dict, Tuple, Any
 
from PIL import Image
 
import random
 

	
 
SKY = Namespace('http://light9.bigasterisk.com/theater/skyline/device/')
 

	
 
random.seed(0)
 

	
 
log = logging.getLogger('effecteval')
 
log.info("reload effecteval")
 

	
 

	
 
def literalColor(rnorm, gnorm, bnorm):
 
    return Literal(
 
        rgb_to_hex([int(rnorm * 255),
 
                    int(gnorm * 255),
 
                    int(bnorm * 255)]))
 
    return Literal(rgb_to_hex((
 
        int(rnorm * 255),  #
 
        int(gnorm * 255),  # 
 
        int(bnorm * 255))))
 

	
 

	
 
def literalColorHsv(h, s, v):
 
    return literalColor(*hsv_to_rgb(h, s, v))
 

	
 

	
 
def nsin(x):
 
    return (math.sin(x * (2 * math.pi)) + 1) / 2
 

	
 

	
 
def ncos(x):
 
    return (math.cos(x * (2 * math.pi)) + 1) / 2
 
@@ -73,135 +73,119 @@ class EffectEval(object):
 
    def __init__(self, graph, effect, simpleOutputs):
 
        self.graph = graph
 
        self.effect = effect
 
        self.simpleOutputs = simpleOutputs
 

	
 
    def outputFromEffect(self, effectSettings, songTime, noteTime):
 
        """
 
        From effect attr settings, like strength=0.75, to output device
 
        settings like light1/bright=0.72;light2/bright=0.78. This runs
 
        the effect code.
 
        """
 
        # both callers need to apply note overrides
 
        effectSettings = dict(
 
            effectSettings
 
        )  # we should make everything into nice float and Color objects too
 
        effectSettings = dict(effectSettings)  # we should make everything into nice float and Color objects too
 

	
 
        strength = float(effectSettings[L9['strength']])
 
        if strength <= 0:
 
            return DeviceSettings(self.graph, []), {'zero': True}
 

	
 
        report = {}
 
        out: Dict[Tuple[URIRef, URIRef], Any] = {}  # (dev, attr): value
 

	
 
        out.update(
 
            self.simpleOutputs.values(
 
                self.effect, strength,
 
                effectSettings.get(L9['colorScale'], None)))
 
        out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.get(L9['colorScale'], None)))
 

	
 
        if self.effect.startswith(L9['effect/']):
 
            tail = 'effect_' + self.effect[len(L9['effect/']):]
 
            try:
 
                func = globals()[tail]
 
            except KeyError:
 
                report['error'] = 'effect code not found for %s' % self.effect
 
            else:
 
                out.update(func(effectSettings, strength, songTime, noteTime))
 

	
 
        outList = [(d, a, v) for (d, a), v in out.items()]
 
        return DeviceSettings(self.graph, outList), report
 

	
 

	
 
def effect_Curtain(effectSettings, strength, songTime, noteTime):
 
    return {(L9['device/lowPattern%s' % n], L9['color']):
 
            literalColor(strength, strength, strength)
 
            for n in range(301, 308 + 1)}
 
    return {(L9['device/lowPattern%s' % n], L9['color']): literalColor(strength, strength, strength) for n in range(301, 308 + 1)}
 

	
 

	
 
def effect_animRainbow(effectSettings, strength, songTime, noteTime):
 
    out = {}
 
    tint = effectSettings.get(L9['tint'], '#ffffff')
 
    tintStrength = float(effectSettings.get(L9['tintStrength'], 0))
 
    tr, tg, tb = hex_to_rgb(tint)
 
    for n in range(1, 5 + 1):
 
        scl = strength * nsin(songTime + n * .3)**3
 
        col = literalColor(
 
            scl * lerp(nsin(songTime + n * .2), tr / 255, tintStrength),
 
            scl * lerp(nsin(songTime + n * .2 + .3), tg / 255, tintStrength),
 
            scl * lerp(nsin(songTime + n * .3 + .6), tb / 255, tintStrength))
 
        col = literalColor(scl * lerp(nsin(songTime + n * .2), tr / 255, tintStrength), scl * lerp(nsin(songTime + n * .2 + .3), tg / 255, tintStrength),
 
                           scl * lerp(nsin(songTime + n * .3 + .6), tb / 255, tintStrength))
 

	
 
        dev = L9['device/aura%s' % n]
 
        out.update({
 
            (dev, L9['color']): col,
 
            (dev, L9['zoom']): .9,
 
        })
 
        ang = songTime * 4
 
        out.update({
 
            (dev, L9['rx']):
 
            lerp(.27, .7, (n - 1) / 4) + .2 * math.sin(ang + n),
 
            (dev, L9['ry']):
 
            lerp(.46, .52, (n - 1) / 4) + .5 * math.cos(ang + n),
 
            (dev, L9['rx']): lerp(.27, .7, (n - 1) / 4) + .2 * math.sin(ang + n),
 
            (dev, L9['ry']): lerp(.46, .52, (n - 1) / 4) + .5 * math.cos(ang + n),
 
        })
 
    return out
 

	
 

	
 
def effect_auraSparkles(effectSettings, strength, songTime, noteTime):
 
    out = {}
 
    tint = effectSettings.get(L9['tint'], '#ffffff')
 
    print(effectSettings)
 
    tr, tg, tb = hex_to_rgb(tint)
 
    for n in range(1, 5 + 1):
 
        scl = strength * ((int(songTime * 10) % n) < 1)
 
        col = literalColorHsv((songTime + (n / 5)) % 1, 1, scl)
 

	
 
        dev = L9['device/aura%s' % n]
 
        out.update({
 
            (dev, L9['color']): col,
 
            (dev, L9['zoom']): .95,
 
        })
 
        ang = songTime * 4
 
        out.update({
 
            (dev, L9['rx']):
 
            lerp(.27, .8, (n - 1) / 4) + .2 * math.sin(ang + n),
 
            (dev, L9['ry']):
 
            lerp(.46, .52, (n - 1) / 4) + .4 * math.cos(ang + n),
 
            (dev, L9['rx']): lerp(.27, .8, (n - 1) / 4) + .2 * math.sin(ang + n),
 
            (dev, L9['ry']): lerp(.46, .52, (n - 1) / 4) + .4 * math.cos(ang + n),
 
        })
 
    return out
 

	
 

	
 
def effect_qpan(effectSettings, strength, songTime, noteTime):
 
    dev = L9['device/q2']
 
    dur = 4
 
    col = scale(scale('#ffffff', strength),
 
                effectSettings.get(L9['colorScale']) or '#ffffff')
 
    col = scale(scale('#ffffff', strength), effectSettings.get(L9['colorScale']) or '#ffffff')
 
    return {
 
        (dev, L9['color']): col,
 
        (dev, L9['focus']): 0.589,
 
        (dev, L9['rx']): lerp(0.778, 0.291, clamp(0, 1, noteTime / dur)),
 
        (dev, L9['ry']): 0.5,
 
        (dev, L9['zoom']): 0.714,
 
    }
 

	
 

	
 
def effect_pulseRainbow(effectSettings, strength, songTime, noteTime):
 
    out = {}
 
    tint = effectSettings.get(L9['tint'], '#ffffff')
 
    tintStrength = float(effectSettings.get(L9['tintStrength'], 0))
 
    tr, tg, tb = hex_to_rgb(tint)
 
    for n in range(1, 5 + 1):
 
        scl = strength
 
        col = literalColor(
 
            scl * lerp(nsin(songTime + n * .2), tr / 255, tintStrength),
 
            scl * lerp(nsin(songTime + n * .2 + .3), tg / 255, tintStrength),
 
            scl * lerp(nsin(songTime + n * .3 + .6), tb / 255, tintStrength))
 
        col = literalColor(scl * lerp(nsin(songTime + n * .2), tr / 255, tintStrength), scl * lerp(nsin(songTime + n * .2 + .3), tg / 255, tintStrength),
 
                           scl * lerp(nsin(songTime + n * .3 + .6), tb / 255, tintStrength))
 

	
 
        dev = L9['device/aura%s' % n]
 
        out.update({
 
            (dev, L9['color']): col,
 
            (dev, L9['zoom']): .5,
 
        })
 
        out.update({
 
            (dev, L9['rx']): lerp(.27, .7, (n - 1) / 4),
 
            (dev, L9['ry']): lerp(.46, .52, (n - 1) / 4),
 
        })
 
    return out
 

	
 
@@ -239,55 +223,49 @@ def effect_qsweep(effectSettings, streng
 
    period = float(effectSettings.get(L9['period'], 2))
 

	
 
    col = effectSettings.get(L9['colorScale'], '#ffffff')
 
    col = scale(col, effectSettings.get(L9['strength'], 1))
 

	
 
    for n in range(1, 3 + 1):
 
        dev = L9['device/q%s' % n]
 
        out.update({
 
            (dev, L9['color']): col,
 
            (dev, L9['zoom']): effectSettings.get(L9['zoom'], .5),
 
        })
 
        out.update({
 
            (dev, L9['rx']):
 
            lerp(.3, .8, nsin(songTime / period + n / 4)),
 
            (dev, L9['ry']):
 
            effectSettings.get(L9['ry'], .2),
 
            (dev, L9['rx']): lerp(.3, .8, nsin(songTime / period + n / 4)),
 
            (dev, L9['ry']): effectSettings.get(L9['ry'], .2),
 
        })
 
    return out
 

	
 

	
 
def effect_qsweepusa(effectSettings, strength, songTime, noteTime):
 
    out = {}
 
    period = float(effectSettings.get(L9['period'], 2))
 

	
 
    colmap = {
 
        1: '#ff0000',
 
        2: '#998888',
 
        3: '#0050ff',
 
    }
 

	
 
    for n in range(1, 3 + 1):
 
        dev = L9['device/q%s' % n]
 
        out.update({
 
            (dev, L9['color']):
 
            scale(colmap[n], effectSettings.get(L9['strength'], 1)),
 
            (dev, L9['zoom']):
 
            effectSettings.get(L9['zoom'], .5),
 
            (dev, L9['color']): scale(colmap[n], effectSettings.get(L9['strength'], 1)),
 
            (dev, L9['zoom']): effectSettings.get(L9['zoom'], .5),
 
        })
 
        out.update({
 
            (dev, L9['rx']):
 
            lerp(.3, .8, nsin(songTime / period + n / 4)),
 
            (dev, L9['ry']):
 
            effectSettings.get(L9['ry'], .5),
 
            (dev, L9['rx']): lerp(.3, .8, nsin(songTime / period + n / 4)),
 
            (dev, L9['ry']): effectSettings.get(L9['ry'], .5),
 
        })
 
    return out
 

	
 

	
 
chase1_members = [
 
    DEV['backlight1'],
 
    DEV['lip1'],
 
    DEV['backlight2'],
 
    DEV['down2'],
 
    DEV['lip2'],
 
    DEV['backlight3'],
 
    DEV['down3'],
 
@@ -370,39 +348,36 @@ def effect_orangeSearch(effectSettings, 
 
        (dev, L9['rx']): lerp(.65, 1, nsin(songTime / 2.0)),
 
        (dev, L9['ry']): .6,
 
        (dev, L9['zoom']): 1,
 
    }
 

	
 

	
 
def effect_Strobe(effectSettings, strength, songTime, noteTime):
 
    rate = 2
 
    duty = .3
 
    offset = 0
 
    f = (((songTime + offset) * rate) % 1.0)
 
    c = (f < duty) * strength
 
    col = rgb_to_hex([int(c * 255), int(c * 255), int(c * 255)])
 
    col = rgb_to_hex((int(c * 255), int(c * 255), int(c * 255)))
 
    return {(L9['device/colorStrip'], L9['color']): Literal(col)}
 

	
 

	
 
def effect_lightning(effectSettings, strength, songTime, noteTime):
 
    devs = [
 
        L9['device/veryLow1'], L9['device/veryLow2'], L9['device/veryLow3'],
 
        L9['device/veryLow4'], L9['device/veryLow5'], L9['device/backlight1'],
 
        L9['device/backlight2'], L9['device/backlight3'],
 
        L9['device/backlight4'], L9['device/backlight5'], L9['device/down2'],
 
        L9['device/down3'], L9['device/down4'], L9['device/hexLow3'],
 
        L9['device/hexLow5'], L9['device/postL1'], L9['device/postR1']
 
        L9['device/veryLow1'], L9['device/veryLow2'], L9['device/veryLow3'], L9['device/veryLow4'], L9['device/veryLow5'], L9['device/backlight1'],
 
        L9['device/backlight2'], L9['device/backlight3'], L9['device/backlight4'], L9['device/backlight5'], L9['device/down2'], L9['device/down3'],
 
        L9['device/down4'], L9['device/hexLow3'], L9['device/hexLow5'], L9['device/postL1'], L9['device/postR1']
 
    ]
 
    out = {}
 
    col = rgb_to_hex([int(255 * strength)] * 3)
 
    col = rgb_to_hex((int(255 * strength),) * 3)
 
    for i, dev in enumerate(devs):
 
        n = noise(songTime * 8 + i * 6.543)
 
        if n > .4:
 
            out[(dev, L9['color'])] = col
 
    return out
 

	
 

	
 
def sample8(img, x, y, repeat=False):
 
    if not (0 <= y < img.height):
 
        return (0, 0, 0)
 
    if 0 <= x < img.width:
 
        return img.getpixel((x, y))
 
@@ -443,25 +418,25 @@ def effect_image(effectSettings, strengt
 
        ('down6', 16),
 
        ('down7', 17),
 
    ]:
 
        color8 = sample8(img, x, y, effectSettings.get(L9['repeat'], True))
 
        color = map(lambda v: v / 255 * strength, color8)
 
        color = [v * cs / 255 for v, cs in zip(color, colorScale)]
 
        if dev in ['cyc1', 'cyc2', 'cyc3', 'cyc4']:
 
            column = dev[-1]
 
            out[(SKY[f'cycRed{column}'], L9['brightness'])] = color[0]
 
            out[(SKY[f'cycGreen{column}'], L9['brightness'])] = color[1]
 
            out[(SKY[f'cycBlue{column}'], L9['brightness'])] = color[2]
 
        else:
 
            out[(dev, L9['color'])] = rgb_to_hex(map(_8bit, color))
 
            out[(dev, L9['color'])] = rgb_to_hex(tuple(map(_8bit, color)))
 
    return out
 

	
 

	
 
def effect_cyc(effectSettings, strength, songTime, noteTime):
 
    colorScale = effectSettings.get(L9['colorScale'], '#ffffff')
 
    r, g, b = map(lambda x: strength * x / 255, hex_to_rgb(colorScale))
 

	
 
    out = {
 
        (SKY['cycRed1'], L9['brightness']): r,
 
        (SKY['cycRed2'], L9['brightness']): r,
 
        (SKY['cycRed3'], L9['brightness']): r,
 
        (SKY['cycRed4'], L9['brightness']): r,
 
@@ -518,22 +493,16 @@ def effect_cycChase1(effectSettings, str
 
            })
 
    return out
 

	
 

	
 
def effect_parNoise(effectSettings, strength, songTime, noteTime):
 
    colorScale = effectSettings.get(L9['colorScale'], '#ffffff')
 
    r, g, b = map(lambda x: x / 255, hex_to_rgb(colorScale))
 
    out = {}
 
    speed = 10
 
    gamma = .6
 
    for dev in [SKY['strip1'], SKY['strip2'], SKY['strip3']]:
 
        out[(dev, L9['color'])] = scale(
 
            rgb_to_hex(
 
                (_8bit(r * math.pow(max(.01, noise(speed * songTime)), gamma)),
 
                 _8bit(g *
 
                       math.pow(max(.01, noise(speed * songTime + 10)), gamma)),
 
                 _8bit(
 
                     b *
 
                     math.pow(max(.01, noise(speed * songTime + 20)), gamma)))),
 
            strength)
 
            rgb_to_hex((_8bit(r * math.pow(max(.01, noise(speed * songTime)), gamma)), _8bit(g * math.pow(max(.01, noise(speed * songTime + 10)), gamma)),
 
                        _8bit(b * math.pow(max(.01, noise(speed * songTime + 20)), gamma)))), strength)
 

	
 
    return out
light9/effect/sequencer/eval_faders.py
Show inline comments
 
new file 100644
 

	
 
import asyncio
 
import logging
 
import time
 
from typing import Callable, Coroutine, List, cast
 
from light9.effect.sequencer.sequencer import Note
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.metrics import metrics
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import NoteUri
 

	
 
log = logging.getLogger('sequencer')
 
class FaderEval:
 
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
 
    
 
    The current faders become Notes in here, for more code reuse.
 
    """
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 ):
 
        self.graph = graph
 
        self.sendToCollector = sendToCollector
 

	
 
        # Notes without times- always on
 
        self.notes: List[Note] = []
 

	
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        log.info('startupdating task')
 
        asyncio.create_task(self.startUpdating())
 

	
 
    async def startUpdating(self):
 
        await self.graph.addAsyncHandler(self.update)
 
        log.info('startupdating task done')
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    @metrics('compile_graph_fader').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        self.notes = []
 
        for fader in self.graph.subjects(RDF.type, L9['Fader']):          
 
            def compileFader() -> Note:
 
                return self.compileFader(cast(URIRef, fader))
 

	
 
            self.notes.append(compileFader())
 
        if self.notes:
 
            asyncio.create_task(self.startUpdating())
 

	
 

	
 
    @metrics('compile_fader').time()
 
    def compileFader(self, fader: URIRef) -> Note:
 
        return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
 
                self.simpleOutputs, timed=False)
 
    
 
    @metrics('update_call_fader').time()
 
    async def update(self):
 
        settings = []
 
        for note in self.notes:
 
            effectValue = self.graph.value(note.uri, L9['value'])
 
            if effectValue is None:
 
                log.info(f'skip note {note}, no :value')
 
                continue
 
            s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
 
            settings.append(s)
 
        devSettings = DeviceSettings.fromList(self.graph, settings)
 
        with metrics('update_s3_send_fader').time():  # our measurement
 
            sendSecs = await self.sendToCollector(devSettings)
light9/effect/sequencer/sequencer.py
Show inline comments
 
@@ -149,40 +149,40 @@ class CodeWatcher(object):
 
        self.notifier.watch(FilePath(effecteval.__file__.replace('.pyc',
 
                                                                 '.py')),
 
                            callbacks=[self.codeChange])
 

	
 
    def codeChange(self, watch, path, mask):
 

	
 
        def go():
 
            log.info("reload effecteval")
 
            imp.reload(effecteval)
 
            self.onChange()
 

	
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 
        reactor.callLater(.1, go) # type: ignore
 

	
 

	
 
class Sequencer(object):
 
    """Notes from the graph + current song playback -> sendToCollector"""
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 fps=40,
 
                 ):
 
        self.graph = graph
 
        self.fps = fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 
        self.music = MusicTime(period=.2)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        asyncio.create_task(self.updateLoop())
 

	
 
@@ -269,76 +269,12 @@ class Sequencer(object):
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 
        dispatcher.send(StateUpdate, update={'songNotes': noteReports})
 

	
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = await self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    metrics('update_s3_send_client').observe(sendSecs)
 

	
 
class FaderEval:
 
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
 
    
 
    The current faders become Notes in here, for more code reuse.
 
    """
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 ):
 
        self.graph = graph
 
        self.sendToCollector = sendToCollector
 

	
 
        # Notes without times- always on
 
        self.notes: List[Note] = []
 

	
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        log.info('startupdating task')
 
        asyncio.create_task(self.startUpdating())
 

	
 
    async def startUpdating(self):
 
        await self.graph.addAsyncHandler(self.update)
 
        log.info('startupdating task done')
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    @metrics('compile_graph_fader').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        self.notes = []
 
        for fader in self.graph.subjects(RDF.type, L9['Fader']):          
 
            def compileFader() -> Note:
 
                return self.compileFader(cast(URIRef, fader))
 

	
 
            self.notes.append(compileFader())
 
        if self.notes:
 
            asyncio.create_task(self.startUpdating())
 

	
 

	
 
    @metrics('compile_fader').time()
 
    def compileFader(self, fader: URIRef) -> Note:
 
        return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
 
                self.simpleOutputs, timed=False)
 
    
 
    @metrics('update_call_fader').time()
 
    async def update(self):
 
        settings = []
 
        for note in self.notes:
 
            effectValue = self.graph.value(note.uri, L9['value'])
 
            if effectValue is None:
 
                log.info(f'skip note {note}, no :value')
 
                continue
 
            s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
 
            settings.append(s)
 
        devSettings = DeviceSettings.fromList(self.graph, settings)
 
        with metrics('update_s3_send_fader').time():  # our measurement
 
            sendSecs = await self.sendToCollector(devSettings)
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline (and an untimed note from the faders)
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.sequencer import FaderEval, Sequencer, StateUpdate
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from light9.run_local import log
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette.types import Receive, Scope, Send
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 

	
light9/effect/simple_outputs.py
Show inline comments
 
import traceback
 
from light9.namespaces import L9, RDF
 
from light9.effect.scale import scale
 
from typing import Dict, List, Tuple, Any
 
from rdflib import URIRef
 

	
 

	
 
class SimpleOutputs(object):
 
    """
 
    Watches graph for effects that are just fading output attrs. 
 
    Call `values` to get (dev,attr):value settings.
 
    """
 

	
 
    def __init__(self, graph):
 
        self.graph = graph
 

	
 
        # effect : [(dev, attr, value, isScaled)]
 
        self.effectOutputs: Dict[URIRef, List[
 
            Tuple[URIRef, URIRef, Any, bool]]] = {}
 
        self.effectOutputs: Dict[URIRef, List[Tuple[URIRef, URIRef, Any, bool]]] = {}
 

	
 
        self.graph.addHandler(self.updateEffectsFromGraph)
 

	
 
    def updateEffectsFromGraph(self):
 
        for effect in self.graph.subjects(RDF.type, L9['Effect']):
 
            settings = []
 
            for setting in self.graph.objects(effect, L9['setting']):
 
                settingValues = dict(self.graph.predicate_objects(setting))
 
                try:
 
                    d = settingValues.get(L9['device'], None)
 
                    a = settingValues.get(L9['deviceAttr'], None)
 
                    v = settingValues.get(L9['value'], None)
0 comments (0 inline, 0 general)