# HG changeset patch
# User drewp@bigasterisk.com
# Date 2023-05-31 06:48:42
# Node ID 63aad60fb070a97d8a7746a80e21ef86444293b9
# Parent 33d1f00de39581bc6d5c2d8bac809f938157aeea
big effect rewrite: the effect functions & library
diff --git a/light9/effect/effect_function_library.py b/light9/effect/effect_function_library.py
new file mode 100644
--- /dev/null
+++ b/light9/effect/effect_function_library.py
@@ -0,0 +1,67 @@
+"""repo of the EffectFunctions in the graph. Includes URI->realPythonFunction"""
+import logging
+from dataclasses import dataclass, field
+from typing import Callable, Dict, List, Optional, Tuple, cast
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from rdflib import RDF, RDFS, Literal, Namespace, URIRef
+from light9.namespaces import DEV, FUNC, L9
+from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectFunction, EffectUri, HexColor, VTUnion)
+from light9.typedgraph import typedValue
+log = logging.getLogger('effectfuncs')
+from . import effect_functions
+class _EffectFunctionInput:
+ effectAttr: EffectAttr
+ defaultValue: Optional[VTUnion]
+class _RdfEffectFunction:
+ uri: EffectFunction
+ label: Optional[Literal]
+ inputs: List[_EffectFunctionInput]
+class EffectFunctionLibrary:
+ """parses :EffectFunction structures"""
+ graph: SyncedGraph
+ funcs: List[_RdfEffectFunction] = field(default_factory=list)
+ def __post_init__(self):
+ self.graph.addHandler(self._compile)
+ def _compile(self):
+ self.funcs = []
+ for subj in self.graph.subjects(RDF.type, L9['EffectFunction']):
+ label = typedValue(Literal | None, self.graph, subj, RDFS.label)
+ inputs = []
+ for inp in self.graph.objects(subj, L9['input']):
+ inputs.append(
+ _EffectFunctionInput( #
+ typedValue(EffectAttr, self.graph, inp, L9['effectAttr']), #
+ typedValue(VTUnion | None, self.graph, inp, L9['defaultValue'])))
+ self.funcs.append(_RdfEffectFunction(cast(EffectFunction, subj), label, inputs))
+ def getFunc(self, uri: EffectFunction) -> Callable:
+ return {
+ FUNC['scale']: effect_functions.effect_scale,
+ FUNC['strobe']: effect_functions.effect_strobe,
+ }[uri]
+ def getDefaultValue(self, uri: EffectFunction, attr:EffectAttr) -> VTUnion:
+ for f in self.funcs:
+ if f.uri == uri:
+ for i in f.inputs:
+ if i.effectAttr==attr:
+ if i.defaultValue is not None:
+ return i.defaultValue
+ raise ValueError(f'no default for {uri} {attr}')
\ No newline at end of file
diff --git a/light9/effect/effect_function_library_test.py b/light9/effect/effect_function_library_test.py
new file mode 100644
--- /dev/null
+++ b/light9/effect/effect_function_library_test.py
@@ -0,0 +1,40 @@
+from light9.effect.effect_function_library import EffectFunctionLibrary
+from light9.mock_syncedgraph import MockSyncedGraph
+from light9.namespaces import L9
+@prefix : .
+@prefix dev: .
+@prefix effect: .
+@prefix func: .
+@prefix rdfs: .
+@prefix xsd: .
+ func:scale
+ a :EffectFunction;
+ rdfs:label "a submaster- scales :deviceSettings";
+ :input
+ [ :effectAttr :strength; :defaultValue 0.0 ],
+ [ :effectAttr :deviceSettings; ] . # e.g. "par2 at color=red; par3 at color=white"
+ func:strobe
+ a :EffectFunction;
+ rdfs:label "blink specified devices";
+ :input
+ [ :effectAttr :strength; :defaultValue 0.0 ],
+ [ :effectAttr :period; :defaultValue 0.5 ],
+ [ :effectAttr :onTime; :defaultValue 0.1 ],
+ [ :effectAttr :deviceSettings ] .
+class TestParsesGraph:
+ def test(self):
+ g = MockSyncedGraph(GRAPH)
+ lib = EffectFunctionLibrary(g)
+ assert len(lib.funcs) == 2
\ No newline at end of file
diff --git a/light9/effect/effect_functions.py b/light9/effect/effect_functions.py
new file mode 100644
--- /dev/null
+++ b/light9/effect/effect_functions.py
@@ -0,0 +1,63 @@
+import logging
+import random
+from PIL import Image
+from webcolors import rgb_to_hex
+from light9.effect.scale import scale
+from light9.effect.settings import DeviceSettings
+from light9.namespaces import L9
+log = logging.getLogger('effectfunc')
+def sample8(img, x, y, repeat=False):
+ if not (0 <= y < img.height):
+ return (0, 0, 0)
+ if 0 <= x < img.width:
+ return img.getpixel((x, y))
+ elif not repeat:
+ return (0, 0, 0)
+ else:
+ return img.getpixel((x % img.width, y))
+def effect_scale(strength: float, devs: DeviceSettings) -> DeviceSettings:
+ out = []
+ if strength != 0:
+ for d, da, v in devs.asList():
+ out.append((d, da, scale(v, strength)))
+ return DeviceSettings(devs.graph, out)
+def effect_strobe(
+ songTime: float, #
+ strength: float,
+ period: float,
+ onTime: float,
+ devs: DeviceSettings) -> DeviceSettings:
+ if period == 0:
+ scl=0
+ else:
+ scl = strength if (songTime % period) < onTime else 0
+ return effect_scale(scl, devs)
+def effect_image(
+ songTime: float, #
+ strength: float,
+ period: float,
+ image: Image.Image,
+ devs: DeviceSettings,
+) -> DeviceSettings:
+ x = int((songTime / period) * image.width)
+ out = []
+ for y, (d, da, v) in enumerate(devs.asOrderedList()):
+ if da != L9['color']:
+ continue
+ color8 = sample8(image, x, y, repeat=True)
+ color = rgb_to_hex(tuple(color8))
+ out.append((d, da, scale(color, strength * v)))
+ return DeviceSettings(devs.graph, out)
\ No newline at end of file
diff --git a/light9/effect/effecteval.py b/light9/effect/effecteval.py
--- a/light9/effect/effecteval.py
+++ b/light9/effect/effecteval.py
@@ -114,11 +114,6 @@ class EffectEval2:
return self.function(inputs)
-def effect_scale(strength: float, devs: DeviceSettings) -> DeviceSettings:
- out = []
- for d, da, v in devs.asList():
- out.append((d, da, scale(v, strength)))
- return DeviceSettings(devs.graph, out)
diff --git a/light9/effect/scale.py b/light9/effect/scale.py
--- a/light9/effect/scale.py
+++ b/light9/effect/scale.py
@@ -1,26 +1,23 @@
from decimal import Decimal
-from rdflib import Literal
+from light9.newtypes import VTUnion
from webcolors import hex_to_rgb, rgb_to_hex
-def scale(value, strength):
- if isinstance(value, Literal):
- value = value.toPython()
+def scale(value:VTUnion, strength:float):
if isinstance(value, Decimal):
- value = float(value)
+ raise TypeError()
if isinstance(value, str):
if value[0] == '#':
if strength == '#ffffff':
return value
r, g, b = hex_to_rgb(value)
- if isinstance(strength, Literal):
- strength = strength.toPython()
- if isinstance(strength, str):
- sr, sg, sb = [v / 255 for v in hex_to_rgb(strength)]
- else:
+ # if isinstance(strength, Literal):
+ # strength = strength.toPython()
+ # if isinstance(strength, str):
+ # sr, sg, sb = [v / 255 for v in hex_to_rgb(strength)]
+ if True:
sr = sg = sb = strength
return rgb_to_hex((int(r * sr), int(g * sg), int(b * sb)))
elif isinstance(value, (int, float)):
diff --git a/light9/namespaces.py b/light9/namespaces.py
--- a/light9/namespaces.py
+++ b/light9/namespaces.py
@@ -18,6 +18,7 @@ class FastNs:
L9 = FastNs("http://light9.bigasterisk.com/")
+FUNC = FastNs("http://light9.bigasterisk.com/effectFunction/")
MUS = Namespace("http://light9.bigasterisk.com/music/")
XSD = Namespace("http://www.w3.org/2001/XMLSchema#")
DCTERMS = Namespace("http://purl.org/dc/terms/")
diff --git a/light9/newtypes.py b/light9/newtypes.py
--- a/light9/newtypes.py
+++ b/light9/newtypes.py
@@ -11,7 +11,7 @@ DeviceClass = NewType('DeviceClass', URI
DmxIndex = NewType('DmxIndex', int) # 1..512
DmxMessageIndex = NewType('DmxMessageIndex', int) # 0..511
DeviceAttr = NewType('DeviceAttr', URIRef) # e.g. :rx
-EffectClass = NewType('EffectClass', URIRef) # e.g. effect:chase
+EffectFunction = NewType('EffectFunction', URIRef) # e.g. func:strobe
EffectUri = NewType('EffectUri', URIRef) # unclear when to use this vs EffectClass
EffectAttr = NewType('EffectAttr', URIRef) # e.g. :chaseSpeed
NoteUri = NewType('NoteUri', URIRef)