big effect rewrite: the effect functions & library
"""repo of the EffectFunctions in the graph. Includes URI->realPythonFunction"""
import logging
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Tuple, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import RDF, RDFS, Literal, Namespace, URIRef

from light9.namespaces import DEV, FUNC, L9
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectFunction, EffectUri, HexColor, VTUnion)
from light9.typedgraph import typedValue

log = logging.getLogger('effectfuncs')

from . import effect_functions


class _EffectFunctionInput:
    effectAttr: EffectAttr
    defaultValue: Optional[VTUnion]


class _RdfEffectFunction:
    uri: EffectFunction
    label: Optional[Literal]
    inputs: List[_EffectFunctionInput]


class EffectFunctionLibrary:
    """parses :EffectFunction structures"""
    graph: SyncedGraph

    funcs: List[_RdfEffectFunction] = field(default_factory=list)

    def __post_init__(self):

    def _compile(self):
        self.funcs = []
        for subj in self.graph.subjects(RDF.type, L9['EffectFunction']):
            label = typedValue(Literal | None, self.graph, subj, RDFS.label)
            inputs = []
            for inp in self.graph.objects(subj, L9['input']):
                    _EffectFunctionInput(  #
                        typedValue(EffectAttr, self.graph, inp, L9['effectAttr']),  #
                        typedValue(VTUnion | None, self.graph, inp, L9['defaultValue'])))

            self.funcs.append(_RdfEffectFunction(cast(EffectFunction, subj), label, inputs))

    def getFunc(self, uri: EffectFunction) -> Callable:
        return {
            FUNC['scale']: effect_functions.effect_scale,
            FUNC['strobe']: effect_functions.effect_strobe,
    def getDefaultValue(self, uri: EffectFunction, attr:EffectAttr) -> VTUnion:
        for f in self.funcs:
            if f.uri == uri:
                for i in f.inputs:
                    if i.effectAttr==attr:
                        if i.defaultValue is not None:
                            return i.defaultValue
        raise ValueError(f'no default for {uri} {attr}')
\ No newline at end of file
from light9.effect.effect_function_library import EffectFunctionLibrary

from light9.mock_syncedgraph import MockSyncedGraph
from light9.namespaces import L9

@prefix : <> .
@prefix dev: <> .
@prefix effect: <> .
@prefix func: <> .
@prefix rdfs: <> .
@prefix xsd: <> .


        a :EffectFunction;
        rdfs:label "a submaster- scales :deviceSettings";
            [ :effectAttr :strength; :defaultValue 0.0 ],
            [ :effectAttr :deviceSettings; ] . # e.g. "par2 at color=red; par3 at color=white"

        a :EffectFunction;
        rdfs:label "blink specified devices";
            [ :effectAttr :strength; :defaultValue 0.0 ],
            [ :effectAttr :period; :defaultValue 0.5 ],
            [ :effectAttr :onTime; :defaultValue 0.1 ],
            [ :effectAttr :deviceSettings ] .



class TestParsesGraph:
    def test(self):
        g = MockSyncedGraph(GRAPH)
        lib = EffectFunctionLibrary(g)
        assert len(lib.funcs) == 2
\ No newline at end of file
import logging
import random

from PIL import Image
from webcolors import rgb_to_hex

from light9.effect.scale import scale
from light9.effect.settings import DeviceSettings
from light9.namespaces import L9


log = logging.getLogger('effectfunc')


def sample8(img, x, y, repeat=False):
    if not (0 <= y < img.height):
        return (0, 0, 0)
    if 0 <= x < img.width:
        return img.getpixel((x, y))
    elif not repeat:
        return (0, 0, 0)
        return img.getpixel((x % img.width, y))


def effect_scale(strength: float, devs: DeviceSettings) -> DeviceSettings:
    out = []
    if strength != 0:
        for d, da, v in devs.asList():
            out.append((d, da, scale(v, strength)))
    return DeviceSettings(devs.graph, out)


def effect_strobe(
        songTime: float,  #
        strength: float,
        period: float,
        onTime: float,
        devs: DeviceSettings) -> DeviceSettings:
    if period == 0:
        scl = strength  if (songTime % period) < onTime else 0
    return effect_scale(scl, devs)


def effect_image(
    songTime: float,  #
    strength: float,
    period: float,
    image: Image.Image,
    devs: DeviceSettings,
) -> DeviceSettings:
    x = int((songTime / period) * image.width)
    out = []
    for y, (d, da, v) in enumerate(devs.asOrderedList()):
        if da != L9['color']:
        color8 = sample8(image, x, y, repeat=True)
        color = rgb_to_hex(tuple(color8))
        out.append((d, da, scale(color, strength * v)))
    return DeviceSettings(devs.graph, out)
\ No newline at end of file
        return self.function(inputs)


def effect_scale(strength: float, devs: DeviceSettings) -> DeviceSettings:
    out = []
    for d, da, v in devs.asList():
        out.append((d, da, scale(v, strength)))
    return DeviceSettings(devs.graph, out)


from decimal import Decimal

from rdflib import Literal
from light9.newtypes import VTUnion
from webcolors import hex_to_rgb, rgb_to_hex


def scale(value, strength):
    if isinstance(value, Literal):
        value = value.toPython()

def scale(value:VTUnion, strength:float):
    if isinstance(value, Decimal):
        value = float(value)
        raise TypeError()

    if isinstance(value, str):
        if value[0] == '#':
            if strength == '#ffffff':
                return value
            r, g, b = hex_to_rgb(value)
            if isinstance(strength, Literal):
                strength = strength.toPython()
            if isinstance(strength, str):
                sr, sg, sb = [v / 255 for v in hex_to_rgb(strength)]
            # if isinstance(strength, Literal):
            #     strength = strength.toPython()
            # if isinstance(strength, str):
            #     sr, sg, sb = [v / 255 for v in hex_to_rgb(strength)]
            if True:
                sr = sg = sb = strength
            return rgb_to_hex((int(r * sr), int(g * sg), int(b * sb)))
    elif isinstance(value, (int, float)):
L9 = FastNs("")
FUNC = FastNs("")
MUS = Namespace("")
XSD = Namespace("")
DCTERMS = Namespace("")
@@ -11,7 +11,7 @@ DeviceClass = NewType('DeviceClass', URI
DmxIndex = NewType('DmxIndex', int)  # 1..512
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
EffectClass = NewType('EffectClass', URIRef)  # e.g. effect:chase
EffectFunction = NewType('EffectFunction', URIRef)  # e.g. func:strobe
EffectUri = NewType('EffectUri', URIRef)  # unclear when to use this vs EffectClass
EffectAttr = NewType('EffectAttr', URIRef)  # e.g. :chaseSpeed
NoteUri = NewType('NoteUri', URIRef)
