Changeset - 92d97e17ca31
[Not reviewed]
default
0 5 1
drewp@bigasterisk.com - 20 months ago 2023-05-31 06:52:37
drewp@bigasterisk.com
big effecteval rewrite
6 files changed with 247 insertions and 131 deletions:
0 comments (0 inline, 0 general)
light9/effect/effecteval.py
Show inline comments
 
import logging
 
import math
 
import random
 
from colorsys import hsv_to_rgb
 
from dataclasses import dataclass
 
from typing import Dict, Optional, Tuple
 
from dataclasses import dataclass, field
 
import time
 
from typing import Callable, Dict, List, Optional, Tuple, cast
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.typedgraph import typedValue
 

	
 
from noise import pnoise1
 
from PIL import Image
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import RDF, Literal, Namespace, URIRef
 
from rdflib import RDF, RDFS, Literal, Namespace, URIRef
 
from webcolors import hex_to_rgb, rgb_to_hex
 

	
 
from light9.effect.scale import scale
 
from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import DEV, L9
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectClass, EffectUri, VTUnion)
 
from light9.namespaces import DEV, L9, FUNC
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, 
 
                             EffectFunction, EffectUri, HexColor, VTUnion)
 

	
 
SKY = Namespace('http://light9.bigasterisk.com/theater/skyline/device/')
 

	
 
random.seed(0)
 

	
 
log = logging.getLogger('effecteval')
 
@@ -68,98 +70,12 @@ def clamp255(x):
 
def _8bit(f):
 
    if not isinstance(f, (int, float)):
 
        raise TypeError(repr(f))
 
    return clamp255(int(f * 255))
 

	
 

	
 
@dataclass
 
class EffectEval2:
 
    graph: SyncedGraph
 
    uri: EffectUri
 

	
 
    effectFunction: Optional[URIRef] = None
 
    isEffect = False
 

	
 
    def __post_init__(self):
 
        self.graph.addHandler(self._compile)
 
        self.effectFunction = L9['todo']
 

	
 
    def _compile(self):
 
        if not self.graph.contains((self.uri, RDF.type, L9['Effect'])):
 
            self.isEffect = False
 
            return
 
        self.isEffect = True
 

	
 
        self.function = effect_scale
 
        devs = []
 
        for s in self.graph.objects(self.uri, L9['setting']):
 
            d = typedValue(DeviceUri, self.graph, s, L9['device'])
 
            da = typedValue(DeviceAttr, self.graph, s, L9['deviceAttr'])
 
            v = typedValue(VTUnion, self.graph, s, L9['value'])
 
            devs.append((d, da, v))
 
        self.devs = DeviceSettings(self.graph, devs)
 

	
 
    def compute(self, inputs: EffectSettings) -> DeviceSettings:
 
        if not self.isEffect:
 
            return DeviceSettings(self.graph, [])
 

	
 
        s = 0
 
        for e, ea, v in inputs.asList():
 
            if not isinstance(v, float):
 
                raise TypeError
 
            if ea == L9['strength']:
 
                s = v
 

	
 
        return effect_scale(s, self.devs)
 

	
 
        return self.function(inputs)
 

	
 

	
 

	
 

	
 
@dataclass
 
class EffectEval:
 
    """
 
    runs one effect's code to turn effect attr settings into output
 
    device settings. No effect state; suitable for reload().
 
    """
 
    graph: SyncedGraph
 
    effect: EffectClass
 
    simpleOutputs: SimpleOutputs
 

	
 
    def outputFromEffect(self, effectSettings: BareEffectSettings, songTime: float, noteTime: float) -> Tuple[DeviceSettings, Dict]:
 
        """
 
        From effect attr settings, like strength=0.75, to output device
 
        settings like light1/bright=0.72;light2/bright=0.78. This runs
 
        the effect code.
 
        """
 
        # todo: what does the next comment line mean?
 
        # both callers need to apply note overrides
 

	
 
        strength = float(effectSettings.s[EffectAttr(L9['strength'])])
 
        if strength <= 0:
 
            return DeviceSettings(self.graph, []), {'zero': True}
 

	
 
        report = {}
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
        out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.s.get(EffectAttr(L9['colorScale']), None)))
 

	
 
        if self.effect.startswith(L9['effect/']):
 
            tail = 'effect_' + self.effect[len(L9['effect/']):]
 
            try:
 
                func = globals()[tail]
 
            except KeyError:
 
                report['error'] = 'effect code not found for %s' % self.effect
 
            else:
 
                out.update(func(effectSettings, strength, songTime, noteTime))
 

	
 
        outList = [(d, a, v) for (d, a), v in out.items()]
 
        return DeviceSettings(self.graph, outList), report
 

	
 

	
 
def effect_Curtain(effectSettings, strength, songTime, noteTime):
 
    return {(L9['device/lowPattern%s' % n], L9['color']): literalColor(strength, strength, strength) for n in range(301, 308 + 1)}
 

	
 

	
 
def effect_animRainbow(effectSettings, strength, songTime, noteTime):
 
    out = {}
light9/effect/effecteval2.py
Show inline comments
 
new file 100644
 
import inspect
 
import logging
 
from dataclasses import dataclass, field
 
from typing import Callable, Dict, List, Optional, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import RDF, RDFS, Literal, Namespace, URIRef
 

	
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.settings import (BareEffectSettings, DeviceSettings, EffectSettings)
 
from light9.namespaces import DEV, FUNC, L9
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectFunction, EffectUri, HexColor, VTUnion)
 
from light9.typedgraph import typedValue
 
from rdflib.term import Node
 

	
 
log = logging.getLogger('effecteval')
 

	
 
@dataclass
 
class Config:
 
    effectFunction: EffectFunction
 
    esettings: EffectSettings
 
    devSettings: Optional[DeviceSettings]# the EffectSettings :effectAttr :devSettings item, if there was one
 
    func: Callable
 
    funcArgs: List[inspect.Parameter]
 

	
 
@dataclass
 
class EffectEval2:
 
    """Runs one effect code to turn EffectSettings (e.g. strength) into DeviceSettings"""
 
    graph: SyncedGraph
 
    uri: EffectUri
 
    lib: EffectFunctionLibrary
 

	
 
    config:Optional[Config]=None
 

	
 
    def __post_init__(self):
 
        self.graph.addHandler(self._compile)
 

	
 
    def _compile(self):
 
        self.effectFunction = None
 
        if not self.graph.contains((self.uri, RDF.type, L9['Effect'])):
 
            return
 

	
 
        try:
 
            effectFunction = typedValue(EffectFunction, self.graph, self.uri, L9['effectFunction'])
 
            effSets = []
 
            devSettings = None
 
            for s in self.graph.objects(self.uri, L9['setting']):
 
                attr = typedValue(EffectAttr, self.graph, s, L9['effectAttr'])
 
                if attr == L9['deviceSettings']:
 
                    value = typedValue(Node, self.graph, s, L9['value'])
 

	
 
                    rows = []
 
                    for ds in self.graph.objects(value, L9['setting']):
 
                        d = typedValue(DeviceUri, self.graph, ds, L9['device'])
 
                        da = typedValue(DeviceAttr, self.graph, ds, L9['deviceAttr'])
 
                        v = typedValue(VTUnion, self.graph, ds, L9['value'])
 
                        rows.append((d, da, v))
 
                    devSettings = DeviceSettings(self.graph, rows)
 
                else:
 
                    value = typedValue(VTUnion, self.graph, s, L9['value'])
 
                    effSets.append((self.uri, attr, value))
 
            esettings = EffectSettings(self.graph, effSets)
 

	
 
            try:
 
                effectFunction = typedValue(EffectFunction, self.graph, self.uri, L9['effectFunction'])
 
            except ValueError:
 
                raise ValueError(f'{self.uri} has no :effectFunction')
 
            func = self.lib.getFunc(effectFunction)
 

	
 
            # This should be in EffectFunctionLibrary
 
            funcArgs = list(inspect.signature(func).parameters.values())
 

	
 
            self.config = Config(effectFunction, esettings, devSettings, func, funcArgs)
 
        except Exception:
 
            log.error(f"while compiling {self.uri}")
 
            raise
 

	
 

	
 
    def compute(self, songTime: float, inputs: EffectSettings) -> DeviceSettings:
 
        """
 
        calls our function using inputs (publishedAttr attrs, e.g. :strength) 
 
        and effect-level settings including a special attr called :deviceSettings 
 
        with DeviceSettings as its value
 
        """
 
        if self.config is None:
 
            return DeviceSettings(self.graph, [])
 

	
 
        c = self.config
 
        kw = {}
 
        for arg in c.funcArgs:
 
            if arg.annotation == DeviceSettings:
 
                v = c.devSettings
 
            elif arg.name == 'songTime':
 
                v = songTime
 
            else:
 
                eaForName = EffectAttr(L9[arg.name])
 
                v = self._getEffectAttrValue(eaForName, inputs)
 

	
 
            kw[arg.name] = v
 
        log.debug('calling %s with %s', c.func, kw)
 
        return c.func(**kw)
 

	
 
    def _getEffectAttrValue(self, attr: EffectAttr, inputs: EffectSettings) -> VTUnion:
 
        c=self.config
 
        if c is None: raise
 
        try:
 
            return inputs.getValue(self.uri, attr, defaultToZero=False)
 
        except KeyError:
 
            pass
 
        try:
 
            return  c.esettings.getValue(self.uri, attr, defaultToZero=False)
 
        except KeyError:
 
            pass
 
        return self.lib.getDefaultValue(c.effectFunction, attr)
 
        
 
\ No newline at end of file
light9/effect/effecteval_test.py
Show inline comments
 
from light9.effect.effecteval import EffectEval2
 
from typing import List, Tuple
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.effecteval2 import EffectEval2
 
from light9.effect.settings import DeviceSettings, EffectSettings
 
from light9.mock_syncedgraph import MockSyncedGraph
 
from light9.namespaces import DEV, L9
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectUri, HexColor)
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectUri, HexColor, VTUnion)
 
import pytest
 

	
 
PREFIX = '''
 
    @prefix : <http://light9.bigasterisk.com/> .
 
    @prefix dev: <http://light9.bigasterisk.com/device/> .
 
    @prefix effect: <http://light9.bigasterisk.com/effect/> .
 

	
 
    @prefix func: <http://light9.bigasterisk.com/effectFunction/> .
 
    @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
 
'''
 

	
 
THEATER = PREFIX + '''
 
    :effect1
 
GRAPH = PREFIX + '''
 

	
 
    func:scale
 
        a :EffectFunction;
 
        rdfs:label "a submaster- scales :deviceSettings";
 
        :input 
 
            [ :effectAttr :strength; :defaultValue 0.0 ],
 
            [ :effectAttr :deviceSettings; ] . # e.g. "par2 at color=red; par3 at color=white"
 

	
 
    func:strobe
 
        a :EffectFunction;
 
        rdfs:label "blink specified devices";
 
        :input 
 
            [ :effectAttr :strength; :defaultValue 0.0 ],
 
            [ :effectAttr :period; :defaultValue 0.5 ],
 
            [ :effectAttr :onTime; :defaultValue 0.1 ],
 
            [ :effectAttr :deviceSettings ] .
 

	
 
    func:image
 
        a :EffectFunction;
 
        rdfs:label "sample image at x=time";
 
        :input
 
            [ :effectAttr :strength; :defaultValue 0.0 ],
 
            [ :effectAttr :period; :defaultValue 2.0 ],
 
            [ :effectAttr :image; :defaultValue "specks.png" ],
 
            [ :effectAttr :deviceSettings; rdfs:comment "these might have a :sort key or a :y value" ] .
 
            
 

	
 
    :effectSub
 
        a :Effect;
 
        :effectFunction effect:scale;
 
        :input [ :effectAttr :strength ];
 
        :setting [ :device dev:light1; :deviceAttr :color; :value "#ff0000" ] .
 
        :effectFunction func:scale;
 
        :publishAttr :strength;
 
        :setting [ :effectAttr :deviceSettings; :value [
 
                   :setting [ :device dev:light1; :deviceAttr :color; :value "#ff0000" ]  ] ].
 

	
 
    :effectDefaultStrobe
 
        a :Effect;
 
        :effectFunction func:strobe;
 
        :publishAttr :strength;
 
        :setting [ :effectAttr :deviceSettings; :value [
 
                   :setting [ :device dev:light1; :deviceAttr :color; :value "#ff0000" ] ] ].
 
        
 
    :effectCustomStrobe
 
        a :Effect;
 
        :effectFunction func:strobe;
 
        :publishAttr :strength;
 
        :setting 
 
            [ :effectAttr :period; :value 3.0],
 
            [ :effectAttr :onTime; :value 0.5],
 
            [ :effectAttr :deviceSettings; :value [
 
              :setting [ :device dev:light1; :deviceAttr :color; :value "#ff0000" ] ] ].
 
'''
 
effect1 = EffectUri(L9['effect1'])
 

	
 
effectSub = EffectUri(L9['effectSub'])
 
effectDefaultStrobe = EffectUri(L9['effectDefaultStrobe'])
 
effectCustomStrobe = EffectUri(L9['effectCustomStrobe'])
 

	
 

	
 
def light1At(col: str) -> List[Tuple[DeviceUri, DeviceAttr, VTUnion]]:
 
    return [(DeviceUri(DEV['light1']), DeviceAttr(L9['color']), HexColor(col))]
 

	
 

	
 
@pytest.fixture
 
def effectFunctions():
 
    g = MockSyncedGraph(GRAPH)
 
    return EffectFunctionLibrary(g)
 

	
 

	
 
class TestEffectEval:
 

	
 
    def test_scalesColors(self):
 
        g = MockSyncedGraph(THEATER)
 
        ee = EffectEval2(g, effect1)
 
        s = EffectSettings(g, [(effect1, EffectAttr(L9['strength']), 0.5)])
 
        ds = ee.compute(s)
 
        assert ds == DeviceSettings(g, [(DeviceUri(DEV['light1']), DeviceAttr(L9['color']), HexColor('#7f0000'))])
 
    def test_scalesColors(self, effectFunctions):
 
        g = effectFunctions.graph
 
        ee = EffectEval2(g, effectSub, effectFunctions)
 
        s = EffectSettings(g, [(effectSub, EffectAttr(L9['strength']), 0.5)])
 
        ds = ee.compute(songTime=0, inputs=s)
 
        assert ds == DeviceSettings(g, light1At('#7f0000'))
 

	
 
    def test_cullsZeroOutputs(self, effectFunctions):
 
        g = effectFunctions.graph
 
        ee = EffectEval2(g, effectSub, effectFunctions)
 
        s = EffectSettings(g, [(effectSub, EffectAttr(L9['strength']), 0.0)])
 
        ds = ee.compute(songTime=0, inputs=s)
 
        assert ds == DeviceSettings(g, [])
 

	
 
    def test_strobeDefaults(self, effectFunctions):
 
        g = effectFunctions.graph
 
        ee = EffectEval2(g, effectDefaultStrobe, effectFunctions)
 
        s = EffectSettings(g, [(effectDefaultStrobe, EffectAttr(L9['strength']), 1.0)])
 
        assert ee.compute(songTime=0, inputs=s) == DeviceSettings(g, light1At('#ff0000'))
 
        assert ee.compute(songTime=.25, inputs=s) == DeviceSettings(g, [])
 

	
 
    def strobeMultsStrength(self, effectFunctions):
 
        g = effectFunctions.graph
 
        ee = EffectEval2(g, effectDefaultStrobe, effectFunctions)
 
        s = EffectSettings(g, [(effectDefaultStrobe, EffectAttr(L9['strength']), 0.5)])
 
        assert ee.compute(songTime=0, inputs=s) == DeviceSettings(g, light1At('#7f0000'))
 

	
 
    def test_strobeCustom(self, effectFunctions):
 
        g = effectFunctions.graph
 
        ee = EffectEval2(g, effectCustomStrobe, effectFunctions)
 
        s = EffectSettings(g, [(effectCustomStrobe, EffectAttr(L9['strength']), 1.0)])
 
        assert ee.compute(songTime=0, inputs=s) == DeviceSettings(g, light1At('#ff0000'))
 
        assert ee.compute(songTime=.25, inputs=s) == DeviceSettings(g, light1At('#ff0000'))
 
        assert ee.compute(songTime=.6, inputs=s) == DeviceSettings(g, [])
light9/effect/sequencer/eval_faders.py
Show inline comments
 
import logging
 
import time
 
from dataclasses import dataclass
 
from typing import List, Optional, cast
 

	
 
from prometheus_client import Summary
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.effecteval2 import EffectEval2
 

	
 
from rdfdb import SyncedGraph
 
from rdflib import URIRef
 
from rdflib.term import Node
 

	
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings, EffectSettings
 
from light9.metrics import metrics
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import EffectAttr, EffectUri, UnixTime
 
from light9.typedgraph import typedValue
 

	
 
@@ -20,48 +21,46 @@ log = logging.getLogger('seq.fader')
 

	
 
COMPILE=Summary('compile_graph_fader', '')
 

	
 
@dataclass
 
class Fader:
 
    graph: SyncedGraph
 
    lib: EffectFunctionLibrary
 
    uri: URIRef
 
    effect: EffectUri
 
    setEffectAttr: EffectAttr
 

	
 
    value: Optional[float]=None # mutable
 

	
 
    def __post_init__(self):
 
        self.ee = effecteval.EffectEval2(self.graph, self.effect)
 
        self.ee = EffectEval2(self.graph, self.effect, self.lib)
 

	
 
class FaderEval:
 
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
 

	
 
    """
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 lib: EffectFunctionLibrary
 
                 ):
 
        self.graph = graph
 
        self.lib = lib
 
        self.faders: List[Fader] = []
 

	
 
        log.info('fader adds handler')
 
        self.graph.addHandler(self._compile)
 
        self.lastLoopSucceeded = False
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self._compile)
 

	
 
    @COMPILE.time()
 
    def _compile(self) -> None:
 
        """rebuild our data from the graph"""
 
        self.faders = []
 
        for fader in self.graph.subjects(RDF.type, L9['Fader']):
 
            effect = typedValue(EffectUri, self.graph, fader, L9['effect'])
 
            setting = typedValue(Node, self.graph, fader, L9['setting'])
 
            setAttr = typedValue(EffectAttr,  self.graph, setting, L9['effectAttr'])
 
            self.faders.append(Fader(self.graph, cast(URIRef, fader), effect, setAttr))
 
            self.faders.append(Fader(self.graph, self.lib, cast(URIRef, fader), effect, setAttr))
 

	
 
        # this could go in a second, smaller addHandler call to avoid rebuilding Fader objs constantly
 
        for f in self.faders:
 
            setting = typedValue(Node, self.graph, f.uri, L9['setting'])            
 
            f.value = typedValue(float, self.graph, setting, L9['value'])
 

	
 
@@ -70,21 +69,10 @@ class FaderEval:
 
        now = UnixTime(time.time())
 
        for f in self.faders:
 
            if f.value is None:
 
                raise TypeError('f.value should be set by now')
 
            effectSettings = EffectSettings(self.graph, [(f.effect, f.setEffectAttr, f.value)])
 

	
 
            if f.value < .001:
 
                continue
 

	
 
            faderEffectOutputs.append(f.ee.compute(effectSettings))
 
            ds = f.ee.compute(now, effectSettings)
 
            faderEffectOutputs.append(ds)
 

	
 
            # ee = effecteval.EffectEval(self.graph, f.effectClass, self.simpleOutputs)
 
            # deviceSettings, report = ee.outputFromEffect(
 
            #     effectSettings,
 
            #     songTime=now, # probably wrong
 
            #     noteTime=now, # wrong
 
            #     )
 
            # log.info(f'  𝅘𝅥𝅮  {uriTail(f.uri)}: {effectSettings=} -> {deviceSettings=}')
 
            # if deviceSettings:
 
            #     notesSettings.append(deviceSettings)
 
        return DeviceSettings.merge(self.graph, faderEffectOutputs)
light9/effect/sequencer/service.py
Show inline comments
 
@@ -3,12 +3,13 @@ plays back effect notes from the timelin
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 

	
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
@@ -49,15 +50,17 @@ async def send_page_updates(request):
 

	
 
def main():
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 
    logging.getLogger('effecteval').setLevel(logging.INFO)
 

	
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    faders = FaderEval(graph)  # bin/fade's untimed effects
 
    lib = EffectFunctionLibrary(graph)
 
    faders = FaderEval(graph, lib)  # bin/fade's untimed effects
 

	
 
    #@metrics('computeAndSend').time() # needs rework with async
 
    async def update(first_run):
 
        ds = faders.computeOutput()
 
        await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
light9/effect/settings.py
Show inline comments
 
@@ -168,14 +168,18 @@ class _Settings:
 

	
 
        accum()
 
        if not words:
 
            words = ['(no settings)']
 
        return '<%s %s>' % (self.__class__.__name__, ' '.join(words))
 

	
 
    def getValue(self, dev: EntityType, attr: AttrType):
 
        return self._compiled.get(dev, {}).get(attr, self._zeroForAttr(attr))
 
    def getValue(self, dev: EntityType, attr: AttrType, defaultToZero=True):
 
        x = self._compiled.get(dev, {})
 
        if defaultToZero:
 
            return x.get(attr, self._zeroForAttr(attr))
 
        else:
 
            return x[attr]
 

	
 
    def _vectorKeys(self, deviceAttrFilter=None):
 
        """stable order of all the dev,attr pairs for this type of settings"""
 
        raise NotImplementedError
 

	
 
    def asList(self) -> List[Tuple[EntityType, AttrType, VTUnion]]:
0 comments (0 inline, 0 general)