diff --git a/light9/effect/effecteval.py b/light9/effect/effecteval.py
--- a/light9/effect/effecteval.py
+++ b/light9/effect/effecteval.py
@@ -1,15 +1,21 @@
-from rdflib import Literal, URIRef, Namespace
-from light9.namespaces import L9, DEV
-from webcolors import rgb_to_hex, hex_to_rgb
+from dataclasses import dataclass
+import logging
+import math
+import random
from colorsys import hsv_to_rgb
-import math
+from typing import Any, Dict, Tuple
+from light9.effect.simple_outputs import SimpleOutputs
+from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, EffectClass, VTUnion
+
from noise import pnoise1
-import logging
-from light9.effect.settings import DeviceSettings
+from PIL import Image
+from rdflib import Literal, Namespace, URIRef
+from webcolors import hex_to_rgb, rgb_to_hex
+
from light9.effect.scale import scale
-from typing import Dict, Tuple, Any
-from PIL import Image
-import random
+from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
+from light9.namespaces import DEV, L9
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
SKY = Namespace('http://light9.bigasterisk.com/theater/skyline/device/')
@@ -64,34 +70,32 @@ def _8bit(f):
return clamp255(int(f * 255))
+@dataclass
class EffectEval:
"""
runs one effect's code to turn effect attr settings into output
device settings. No state; suitable for reload().
"""
+ graph: SyncedGraph
+ effect: EffectClass
+ simpleOutputs: SimpleOutputs
- def __init__(self, graph, effect, simpleOutputs):
- self.graph = graph
- self.effect = effect
- self.simpleOutputs = simpleOutputs
-
- def outputFromEffect(self, effectSettings, songTime, noteTime):
+ def outputFromEffect(self, effectSettings: BareEffectSettings, songTime: float, noteTime: float) -> Tuple[EffectSettings, Dict]:
"""
From effect attr settings, like strength=0.75, to output device
settings like light1/bright=0.72;light2/bright=0.78. This runs
the effect code.
"""
# both callers need to apply note overrides
- effectSettings = dict(effectSettings) # we should make everything into nice float and Color objects too
-
- strength = float(effectSettings[L9['strength']])
+ print(f'outputFromEffect({effectSettings=}, {songTime=}, {noteTime=})')
+ strength = float(effectSettings.s[EffectAttr(L9['strength'])])
if strength <= 0:
- return DeviceSettings(self.graph, []), {'zero': True}
+ return EffectSettings(self.graph, []), {'zero': True}
report = {}
- out: Dict[Tuple[URIRef, URIRef], Any] = {} # (dev, attr): value
+ out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {} # (dev, attr): value
- out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.get(L9['colorScale'], None)))
+ out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.s.get(EffectAttr(L9['colorScale']), None)))
if self.effect.startswith(L9['effect/']):
tail = 'effect_' + self.effect[len(L9['effect/']):]
@@ -103,7 +107,8 @@ class EffectEval:
out.update(func(effectSettings, strength, songTime, noteTime))
outList = [(d, a, v) for (d, a), v in out.items()]
- return DeviceSettings(self.graph, outList), report
+ print(f'{outList=}')
+ return EffectSettings(self.graph, outList), report
def effect_Curtain(effectSettings, strength, songTime, noteTime):
diff --git a/light9/effect/sequencer/eval_faders.py b/light9/effect/sequencer/eval_faders.py
--- a/light9/effect/sequencer/eval_faders.py
+++ b/light9/effect/sequencer/eval_faders.py
@@ -1,4 +1,3 @@
-
import asyncio
import logging
import time
@@ -13,9 +12,9 @@ from light9.effect.settings import Devic
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
-from light9.newtypes import NoteUri
+from light9.newtypes import NoteUri, UnixTime
-log = logging.getLogger('sequencer')
+log = logging.getLogger('seq.fader')
class FaderEval:
"""peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
@@ -37,8 +36,8 @@ class FaderEval:
self.lastLoopSucceeded = False
# self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
- log.info('startupdating task')
- asyncio.create_task(self.startUpdating())
+ # have caller do this
+ #asyncio.create_task(self.startUpdating())
async def startUpdating(self):
await self.graph.addAsyncHandler(self.update)
@@ -58,25 +57,34 @@ class FaderEval:
return self.compileFader(cast(URIRef, fader))
self.notes.append(compileFader())
- if self.notes:
- asyncio.create_task(self.startUpdating())
+ # if self.notes:
+ # asyncio.create_task(self.startUpdating())
@metrics('compile_fader').time()
def compileFader(self, fader: URIRef) -> Note:
- return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
- self.simpleOutputs, timed=False)
+ return Note(self.graph, cast(NoteUri, fader),
+ timed=False)
+ def _computeOutput(self) -> DeviceSettings:
+ notesSettings = []
+ now = UnixTime(time.time())
+ for note in self.notes:
+ effectSettings, report = note.outputCurrent()
+
+ ee = effecteval.EffectEval(self.graph, note.effectClass, self.simpleOutputs)
+ deviceSettings, report = ee.outputFromEffect(
+ effectSettings,
+ songTime=now, # probably wrong
+ noteTime=now, # wrong
+ )
+ notesSettings.append(deviceSettings)
+ return DeviceSettings.merge(self.graph, notesSettings)
+
+
@metrics('update_call_fader').time()
async def update(self):
- settings = []
- for note in self.notes:
- effectValue = self.graph.value(note.uri, L9['value'])
- if effectValue is None:
- log.info(f'skip note {note}, no :value')
- continue
- s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
- settings.append(s)
- devSettings = DeviceSettings.fromList(self.graph, settings)
+ log.info(f'update {len(self.notes)=}')
+ devSettings = self._computeOutput()
with metrics('update_s3_send_fader').time(): # our measurement
sendSecs = await self.sendToCollector(devSettings)
diff --git a/light9/effect/sequencer/eval_faders_test.py b/light9/effect/sequencer/eval_faders_test.py
new file mode 100644
--- /dev/null
+++ b/light9/effect/sequencer/eval_faders_test.py
@@ -0,0 +1,61 @@
+from unittest import mock
+from light9.effect.sequencer.eval_faders import FaderEval
+from light9.effect.settings import DeviceSettings
+from light9.mock_syncedgraph import MockSyncedGraph
+from light9.namespaces import L9
+
+
+PREFIXES = '''
+@prefix : .
+@prefix effect: .
+@prefix rdfs: .
+@prefix show: .
+@prefix xsd: .
+@prefix dev: .
+@prefix dmxA: .
+'''
+
+NOTE_GRAPH = PREFIXES + '''
+ :brightness
+ a :DeviceAttr;
+ rdfs:label "brightness";
+ :dataType :scalar .
+
+ :strength
+ a :EffectAttr;
+ rdfs:label "strength" .
+
+ :SimpleDimmer
+ a :DeviceClass;
+ rdfs:label "SimpleDimmer";
+ :deviceAttr :brightness;
+ :attr [ :outputAttr :level; :dmxOffset 0 ] .
+
+ :light1
+ a :SimpleDimmer;
+ :dmxUniverse dmxA:;
+ :dmxBase 178 .
+
+ effect:effect1
+ a :EffectClass;
+ :setting effect:effect1_set1 .
+ effect:effect1_set1
+ :device :light1;
+ :deviceAttr :brightness;
+ :scaledValue 0.5 .
+ :fade1
+ a :Fader;
+ :effectClass effect:effect1;
+ :effectAttr :strength;
+ :value 0.6 .
+
+ '''
+
+class TestFaderEval:
+ def test_faderValueScalesEffectSettings(self):
+ g = MockSyncedGraph(NOTE_GRAPH)
+ sender = mock.MagicMock()
+
+ f = FaderEval(g, sender)
+ devSettings = f._computeOutput()
+ assert devSettings == DeviceSettings(g, [(L9['light1'], L9['brightness'], 0.3)])
\ No newline at end of file
diff --git a/light9/effect/sequencer/note.py b/light9/effect/sequencer/note.py
--- a/light9/effect/sequencer/note.py
+++ b/light9/effect/sequencer/note.py
@@ -1,14 +1,18 @@
import bisect
+from dataclasses import dataclass
import logging
import time
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple, Union, cast
+from light9.effect.effecteval import EffectEval
+from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
+from light9.effect.simple_outputs import SimpleOutputs
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef
from light9.namespaces import L9
-from light9.newtypes import Curve, DeviceAttr, DeviceUri, NoteUri, typedValue
+from light9.newtypes import Curve, DeviceAttr, EffectAttr, EffectClass, NoteUri, VTUnion, typedValue
log = logging.getLogger('sequencer')
@@ -20,32 +24,64 @@ def pyType(n):
return ret
+def prettyFormat(x: Union[float, str]):
+ if isinstance(x, float):
+ return round(x, 4)
+ return x
+
+
+@dataclass
class Note:
+ """A Note outputs EffectAttr settings.
+
+ Sample graph:
+ :note1 a :Note; :curve :n1c1; :effectClass effect:allcolor;
- def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs, timed=True):
- g = self.graph = graph
- self.uri = uri
- self.timed = timed
- self.effectEval = effectevalModule.EffectEval(graph, g.value(uri, L9['effectClass']), simpleOutputs)
- self.baseEffectSettings: Dict[URIRef, Any] = {} # {effectAttr: value}
- for s in g.objects(uri, L9['setting']):
- settingValues = dict(g.predicate_objects(s))
- ea = cast(URIRef, settingValues[L9['effectAttr']])
- self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])
+ It can animate the EffectAttr settings over time, in two ways:
+ * a `timed note` has an envelope curve that animates
+ the :strength EffectAttr over time
+ * an `untimed note` has no curve, a fixed strength, but
+ still passes the wall clock time to its effect so the
+ effect can include animation. A `Fader` is an untimed note.
+
+ This obj is immutable, I think, but the graph can change,
+ which can affect the output. However, I think this doesn't
+ do its own rebuilds, and it's up to the caller to addHandler
+ around the creation of Note objects.
+ """
+ graph: SyncedGraph
+ uri: NoteUri
+ # simpleOutputs: SimpleOutputs
+ timed: bool = True
- if timed:
+ def __post_init__(self):
+ ec = self.graph.value(self.uri, L9['effectClass'])
+ if ec is None:
+ raise ValueError(f'note {self.uri} has no :effectClass')
+ self.effectClass = EffectClass(ec)
- def floatVal(s, p):
- return typedValue(float, g, s, p)
+ self.baseEffectSettings = self.getBaseEffectSettings()
- originTime = floatVal(uri, L9['originTime'])
+ if self.timed:
+ originTime = typedValue(float, self.graph, self.uri, L9['originTime'])
self.points: List[Tuple[float, float]] = []
- for curve in g.objects(uri, L9['curve']):
+ for curve in self.graph.objects(self.uri, L9['curve']):
self.points.extend(self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
self.points.sort()
else:
self.points = []
+ def getBaseEffectSettings(self) -> BareEffectSettings:
+ """i think these are settings that are fixed over time,
+ e.g. that you set in the note's body in the timeline editor
+ """
+ out: Dict[EffectAttr, VTUnion] = {}
+ for s in self.graph.objects(self.uri, L9['setting']):
+ settingValues = dict(self.graph.predicate_objects(s))
+ ea = cast(EffectAttr, settingValues[L9['effectAttr']])
+ out[ea] = pyType(settingValues[L9['value']])
+ return BareEffectSettings(s=out)
+
def getCurvePoints(self, curve: Curve, attr, originTime: float) -> List[Tuple[float, float]]:
points = []
po = list(self.graph.predicate_objects(curve))
@@ -56,7 +92,7 @@ class Note:
t = cast(Literal, po2[L9['time']]).toPython()
if not isinstance(t, float):
raise TypeError
-
+
v = cast(Literal, po2[L9['value']]).toPython()
if not isinstance(v, float):
raise TypeError
@@ -81,34 +117,37 @@ class Note:
y = p1[1] + (p2[1] - p1[1]) * frac
return y
- def outputSettings(self, t: float, strength: Optional[float] = None) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
- """
- list of (device, attr, value), and a report for web
- """
+ def outputCurrent(self):
+ st = typedValue(float, self.graph, self.uri, L9['value'])
+ return self._outputSettings(t=None, strength=st)
+
+ def _outputSettings(
+ self,
+ t: float | None,
+ strength: Optional[float] = None #
+ ) -> Tuple[BareEffectSettings, Dict]:
+
if t is None:
if self.timed:
raise TypeError()
t = time.time() # so live effects will move
- report = {
+ report:Dict[str,Any] = {
'note': str(self.uri),
- 'effectClass': self.effectEval.effect,
+ 'effectClass': str(self.effectClass),
}
- strengthAttr = cast(DeviceAttr, L9['strength'])
-
- effectSettings: Dict[DeviceAttr, Union[float, str]] = dict((DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
- effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength
+ s = self.evalCurve(t) if strength is None else strength
+ out = self.baseEffectSettings.withStrength(s)
+ report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(out.s.items()))
+ report['nonZero'] = s > 0
- def prettyFormat(x: Union[float, str]):
- if isinstance(x, float):
- return round(x, 4)
- return x
+ return out, report
- report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
- report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
+ # old api had this
+
startTime = self.points[0][0] if self.timed else 0
out, evalReport = self.effectEval.outputFromEffect(
- list(effectSettings.items()),
+ effectSettings,
songTime=t,
# note: not using origin here since it's going away
noteTime=t - startTime)
diff --git a/light9/effect/sequencer/note_test.py b/light9/effect/sequencer/note_test.py
new file mode 100644
--- /dev/null
+++ b/light9/effect/sequencer/note_test.py
@@ -0,0 +1,79 @@
+import pytest
+
+from light9.effect.sequencer import Note
+from light9.effect.settings import BareEffectSettings
+from light9.mock_syncedgraph import MockSyncedGraph
+from light9.namespaces import L9
+from light9.newtypes import EffectAttr, NoteUri
+
+PREFIXES = '''
+@prefix : .
+@prefix effect: .
+@prefix rdfs: .
+@prefix show: .
+@prefix xsd: .
+@prefix dev: .
+@prefix dmxA: .
+'''
+
+FADER_GRAPH = PREFIXES + '''
+ :fade1
+ a :Fader;
+ :effectClass effect:effect1;
+ :effectAttr :strength;
+ :value 0.6 .
+'''
+
+
+class TestUntimedFaderNote:
+
+ def test_returnsEffectSettings(self):
+ g = MockSyncedGraph(FADER_GRAPH)
+ n = Note(g, NoteUri(L9['fade1']), timed=False)
+ out, report = n.outputCurrent()
+ assert report['effectSettings'] == {'http://light9.bigasterisk.com/strength': 0.6}
+ assert out == BareEffectSettings(s={EffectAttr(L9['strength']): 0.6})
+
+
+NOTE_GRAPH = PREFIXES + '''
+ :brightness
+ a :DeviceAttr;
+ rdfs:label "brightness";
+ :dataType :scalar .
+
+ :strength
+ a :EffectAttr;
+ rdfs:label "strength" .
+
+ :SimpleDimmer
+ a :DeviceClass;
+ rdfs:label "SimpleDimmer";
+ :deviceAttr :brightness;
+ :attr [ :outputAttr :level; :dmxOffset 0 ] .
+
+ dev:light1
+ a :SimpleDimmer;
+ :dmxUniverse dmxA:;
+ :dmxBase 178 .
+
+ effect:effect1
+ a :EffectClass;
+ :setting effect:effect1_set1 .
+ effect:effect1_set1
+ :device dev:light1;
+ :deviceAttr :brightness;
+ :scaledValue 0.5 .
+ :fade1
+ a :Fader;
+ :effectClass effect:effect1;
+ :effectAttr :strength;
+ :value 0.6 .
+
+ '''
+
+
+class TestTimedNote:
+
+ @pytest.mark.skip()
+ def test_scalesStrengthWithCurve(self):
+ pass
diff --git a/light9/effect/sequencer/sequencer.py b/light9/effect/sequencer/sequencer.py
--- a/light9/effect/sequencer/sequencer.py
+++ b/light9/effect/sequencer/sequencer.py
@@ -100,8 +100,7 @@ class Sequencer:
self.notes[song] = []
for note in self.graph.objects(song, L9['note']):
try:
- n = Note(self.graph, NoteUri(cast(NoteUri, note)), effecteval,
- self.simpleOutputs)
+ n = Note(self.graph, NoteUri(cast(NoteUri, note)))
except Exception:
log.warn(f"failed to build Note {note} - skipping")
anyErrors = True
diff --git a/light9/effect/settings.py b/light9/effect/settings.py
--- a/light9/effect/settings.py
+++ b/light9/effect/settings.py
@@ -2,19 +2,31 @@
Data structure and convertors for a table of (device,attr,value)
rows. These might be effect attrs ('strength'), device attrs ('rx'),
or output attrs (dmx channel).
+
+BareSettings means (attr,value), no device.
"""
+from __future__ import annotations
+
import decimal
+import logging
+from dataclasses import dataclass
+from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple, Union, cast
+from light9.localsyncedgraph import LocalSyncedGraph
+
import numpy
-from rdflib import URIRef, Literal
-from light9.namespaces import RDF, L9
-import logging
+from rdflib import Literal, URIRef
+
+from light9.collector.device import resolve
+from light9.namespaces import L9, RDF
+from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+
log = logging.getLogger('settings')
-from light9.collector.device import resolve
-from typing import Sequence, Dict, Union, List
def parseHex(h):
- if h[0] != '#': raise ValueError(h)
+ if h[0] != '#':
+ raise ValueError(h)
return [int(h[i:i + 2], 16) for i in (1, 3, 5)]
@@ -22,10 +34,10 @@ def parseHexNorm(h):
return [x / 255 for x in parseHex(h)]
-def toHex(rgbFloat: Sequence[float]) -> str:
+def toHex(rgbFloat: Sequence[float]) -> HexColor:
assert len(rgbFloat) == 3
scaled = (max(0, min(255, int(v * 255))) for v in rgbFloat)
- return '#%02x%02x%02x' % tuple(scaled) # type: ignore
+ return HexColor('#%02x%02x%02x' % tuple(scaled))
def getVal(graph, subj):
@@ -36,73 +48,64 @@ def getVal(graph, subj):
return ret
+GraphType = SyncedGraph | LocalSyncedGraph
+
+
class _Settings:
"""
+ Generic for DeviceUri/DeviceAttr/VTUnion or EffectClass/EffectAttr/VTUnion
+
default values are 0 or '#000000'. Internal rep must not store zeros or some
comparisons will break.
"""
+ EntityType = DeviceUri
+ AttrType = DeviceAttr
- def __init__(self, graph, settingsList):
+ def __init__(self, graph: GraphType, settingsList: List[Tuple[Any, Any, VTUnion]]):
self.graph = graph # for looking up all possible attrs
- self._compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {
- } # dev: { attr: val }; val is number or colorhex
+ self._compiled: Dict[self.__class__.EntityType, Dict[self.__class__.AttrType, VTUnion]] = {}
for row in settingsList:
self._compiled.setdefault(row[0], {})[row[1]] = row[2]
# self._compiled may not be final yet- see _fromCompiled
self._delZeros()
@classmethod
- def _fromCompiled(cls, graph, compiled):
+ def _fromCompiled(cls, graph: GraphType, compiled: Dict[EntityType, Dict[AttrType, VTUnion]]):
obj = cls(graph, [])
obj._compiled = compiled
obj._delZeros()
return obj
@classmethod
- def fromResource(cls, graph, subj):
- settingsList = []
- with graph.currentState() as g:
- for s in g.objects(subj, L9['setting']):
- d = g.value(s, L9['device'])
- da = g.value(s, L9['deviceAttr'])
- v = getVal(g, s)
- settingsList.append((d, da, v))
- return cls(graph, settingsList)
-
- @classmethod
- def fromVector(cls, graph, vector, deviceAttrFilter=None):
- compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}
- i = 0
- for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
- if a == L9['color']:
- v = toHex(vector[i:i + 3])
- i += 3
- else:
- v = vector[i]
- i += 1
- compiled.setdefault(d, {})[a] = v
- return cls._fromCompiled(graph, compiled)
-
- @classmethod
- def fromList(cls, graph, others):
+ def fromList(cls, graph: GraphType, others: List[_Settings]):
"""note that others may have multiple values for an attr"""
- out = cls(graph, [])
+ self = cls(graph, [])
for s in others:
- if not isinstance(s, cls):
- raise TypeError(s)
+ # if not isinstance(s, cls):
+ # raise TypeError(s)
for row in s.asList(): # could work straight from s._compiled
if row[0] is None:
raise TypeError('bad row %r' % (row,))
dev, devAttr, value = row
- devDict = out._compiled.setdefault(dev, {})
+ devDict = self._compiled.setdefault(dev, {})
if devAttr in devDict:
- value = resolve(dev, devAttr, [devDict[devAttr], value])
+ existingVal: VTUnion = devDict[devAttr]
+ raise NotImplementedError('fixme: dev is to be a deviceclass (but it is currently unused)')
+ value = resolve(dev, devAttr, [existingVal, value])
devDict[devAttr] = value
- out._delZeros()
- return out
+ self._delZeros()
+ return self
@classmethod
- def fromBlend(cls, graph, others):
+ def _mult(cls, weight, row, dd) -> VTUnion:
+ if isinstance(row[2], str):
+ prev = parseHexNorm(dd.get(row[1], '#000000'))
+ return toHex(prev + weight * numpy.array(parseHexNorm(row[2])))
+ else:
+ return dd.get(row[1], 0) + weight * row[2]
+
+ @classmethod
+ def fromBlend(cls, graph: GraphType, others: List[Tuple[float, _Settings]]):
"""others is a list of (weight, Settings) pairs"""
out = cls(graph, [])
for weight, s in others:
@@ -113,19 +116,14 @@ class _Settings:
raise TypeError('bad row %r' % (row,))
dd = out._compiled.setdefault(row[0], {})
- if isinstance(row[2], str):
- prev = parseHexNorm(dd.get(row[1], '#000000'))
- newVal = toHex(prev +
- weight * numpy.array(parseHexNorm(row[2])))
- else:
- newVal = dd.get(row[1], 0) + weight * row[2]
+ newVal = cls._mult(weight, row, dd)
dd[row[1]] = newVal
out._delZeros()
return out
- def _zeroForAttr(self, attr):
+ def _zeroForAttr(self, attr: AttrType) -> VTUnion:
if attr == L9['color']:
- return '#000000'
+ return HexColor('#000000')
return 0.0
def _delZeros(self):
@@ -137,15 +135,12 @@ class _Settings:
del self._compiled[dev]
def __hash__(self):
- itemed = tuple([(d, tuple([(a, v)
- for a, v in sorted(av.items())]))
- for d, av in sorted(self._compiled.items())])
+ itemed = tuple([(d, tuple([(a, v) for a, v in sorted(av.items())])) for d, av in sorted(self._compiled.items())])
return hash(itemed)
def __eq__(self, other):
if not issubclass(other.__class__, self.__class__):
- raise TypeError("can't compare %r to %r" %
- (self.__class__, other.__class__))
+ raise TypeError("can't compare %r to %r" % (self.__class__, other.__class__))
return self._compiled == other._compiled
def __ne__(self, other):
@@ -160,24 +155,24 @@ class _Settings:
def accum():
for dev, av in self._compiled.items():
for attr, val in sorted(av.items()):
- words.append(
- '%s.%s=%s' %
- (dev.rsplit('/')[-1], attr.rsplit('/')[-1], val))
+ words.append('%s.%s=%s' % (dev.rsplit('/')[-1], attr.rsplit('/')[-1], val))
if len(words) > 5:
words.append('...')
return
accum()
+ if not words:
+ words = ['(no settings)']
return '<%s %s>' % (self.__class__.__name__, ' '.join(words))
- def getValue(self, dev, attr):
+ def getValue(self, dev: EntityType, attr: AttrType):
return self._compiled.get(dev, {}).get(attr, self._zeroForAttr(attr))
def _vectorKeys(self, deviceAttrFilter=None):
"""stable order of all the dev,attr pairs for this type of settings"""
raise NotImplementedError
- def asList(self):
+ def asList(self) -> List[Tuple[EntityType, AttrType, VTUnion]]:
"""old style list of (dev, attr, val) tuples"""
out = []
for dev, av in self._compiled.items():
@@ -185,26 +180,27 @@ class _Settings:
out.append((dev, attr, val))
return out
- def devices(self):
+ def devices(self) -> List[EntityType]:
return list(self._compiled.keys())
- def toVector(self, deviceAttrFilter=None):
+ def toVector(self, deviceAttrFilter=None) -> List[float]:
out: List[float] = []
for dev, attr in self._vectorKeys(deviceAttrFilter):
v = self.getValue(dev, attr)
if attr == L9['color']:
out.extend(parseHexNorm(v))
else:
+ if not isinstance(v, float):
+ raise TypeError(f'{attr=} value was {v=}')
out.append(v)
return out
- def byDevice(self):
+ def byDevice(self) -> Iterable[Tuple[EntityType, _Settings]]:
for dev, av in self._compiled.items():
yield dev, self.__class__._fromCompiled(self.graph, {dev: av})
- def ofDevice(self, dev):
- return self.__class__._fromCompiled(self.graph,
- {dev: self._compiled.get(dev, {})})
+ def ofDevice(self, dev: EntityType) -> _Settings:
+ return self.__class__._fromCompiled(self.graph, {dev: self._compiled.get(dev, {})})
def distanceTo(self, other):
diff = numpy.array(self.toVector()) - other.toVector()
@@ -212,7 +208,7 @@ class _Settings:
log.info('distanceTo %r - %r = %g', self, other, d)
return d
- def statements(self, subj, ctx, settingRoot, settingsSubgraphCache):
+ def statements(self, subj: EntityType, ctx: URIRef, settingRoot: URIRef, settingsSubgraphCache: Set):
"""
settingRoot can be shared across images (or even wider if you want)
"""
@@ -227,8 +223,7 @@ class _Settings:
continue
scaledAttributeTypes = [L9['color'], L9['brightness'], L9['uv']]
- settingType = L9[
- 'scaledValue'] if attr in scaledAttributeTypes else L9['value']
+ settingType = L9['scaledValue'] if attr in scaledAttributeTypes else L9['value']
if not isinstance(val, URIRef):
val = Literal(val)
add.extend([
@@ -242,6 +237,8 @@ class _Settings:
class DeviceSettings(_Settings):
+ EntityType = DeviceUri
+ AttrType = DeviceAttr
def _vectorKeys(self, deviceAttrFilter=None):
with self.graph.currentState() as g:
@@ -258,3 +255,47 @@ class DeviceSettings(_Settings):
continue
keys.append(key)
return keys
+
+ @classmethod
+ def fromResource(cls, graph: GraphType, subj: EntityType):
+ settingsList = []
+ with graph.currentState() as g:
+ for s in g.objects(subj, L9['setting']):
+ d = g.value(s, L9['device'])
+ da = g.value(s, L9['deviceAttr'])
+ v = getVal(g, s)
+ settingsList.append((d, da, v))
+ return cls(graph, settingsList)
+
+ @classmethod
+ def fromVector(cls, graph, vector, deviceAttrFilter=None):
+ compiled: Dict[DeviceSettings.EntityType, Dict[DeviceSettings.AttrType, VTUnion]] = {}
+ i = 0
+ for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
+ if a == L9['color']:
+ v = toHex(vector[i:i + 3])
+ i += 3
+ else:
+ v = vector[i]
+ i += 1
+ compiled.setdefault(d, {})[a] = v
+ return cls._fromCompiled(graph, compiled)
+
+ @classmethod
+ def merge(cls, graph: SyncedGraph, others: List[DeviceSettings]) -> DeviceSettings:
+ return cls.fromList(graph, cast(List[_Settings], others))
+
+
+@dataclass
+class BareEffectSettings:
+ # settings not specific to any effect
+ s: Dict[EffectAttr, VTUnion]
+
+ def withStrength(self, strength: float) -> BareEffectSettings:
+ out = self.s.copy()
+ out[EffectAttr(L9['strength'])] = strength
+ return BareEffectSettings(s=out)
+
+
+class EffectSettings(_Settings):
+ pass
diff --git a/light9/effect/settings_test.py b/light9/effect/settings_test.py
--- a/light9/effect/settings_test.py
+++ b/light9/effect/settings_test.py
@@ -1,4 +1,6 @@
+from typing import cast
import unittest
+from light9.newtypes import DeviceAttr, DeviceUri, HexColor, VTUnion
from rdflib import Literal
from rdfdb.patch import Patch
from light9.localsyncedgraph import LocalSyncedGraph
@@ -52,7 +54,7 @@ class TestDeviceSettings(unittest.TestCa
(L9['foo_set1'], L9['deviceAttr'], L9['speed'], ctx),
(L9['foo_set1'], L9['scaledValue'], Literal(0.2), ctx),
]))
- s = DeviceSettings.fromResource(self.graph, L9['foo'])
+ s = DeviceSettings.fromResource(self.graph, DeviceUri(L9['foo']))
self.assertEqual(
DeviceSettings(self.graph, [
@@ -62,8 +64,8 @@ class TestDeviceSettings(unittest.TestCa
def testToVector(self):
v = DeviceSettings(self.graph, [
- (DEV['aura1'], L9['rx'], 0.5),
- (DEV['aura1'], L9['color'], '#00ff00'),
+ (DeviceUri(DEV['aura1']), DeviceAttr(L9['rx']), 0.5),
+ (DeviceUri(DEV['aura1']), DeviceAttr(L9['color']), HexColor('#00ff00')),
]).toVector()
self.assertEqual([
0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
@@ -78,14 +80,14 @@ class TestDeviceSettings(unittest.TestCa
self.assertEqual(
DeviceSettings(self.graph, [
- (DEV['aura1'], L9['rx'], 0.5),
- (DEV['aura1'], L9['color'], '#00ff00'),
+ (DeviceUri(DEV['aura1']), DeviceAttr(L9['rx']), 0.5),
+ (DeviceUri(DEV['aura1']), DeviceAttr(L9['color']), HexColor('#00ff00')),
]), s)
def testAsList(self):
sets = [
- (L9['light1'], L9['attr2'], 0.3),
- (L9['light1'], L9['attr1'], 0.5),
+ (DeviceUri(L9['light1']), DeviceAttr(L9['attr2']), cast(VTUnion,0.3)),
+ (DeviceUri(L9['light1']), DeviceAttr(L9['attr1']), 0.5),
]
self.assertCountEqual(sets, DeviceSettings(self.graph, sets).asList())
@@ -101,7 +103,7 @@ class TestDeviceSettings(unittest.TestCa
s = DeviceSettings(self.graph, [
(DEV['aura2'], L9['rx'], 0.1),
])
- stmts = s.statements(L9['foo'], L9['ctx1'], L9['s_'], set())
+ stmts = s.statements(DeviceUri(L9['foo']), L9['ctx1'], L9['s_'], set())
self.maxDiff = None
setting = sorted(stmts)[-1][0]
self.assertCountEqual([
@@ -157,8 +159,8 @@ class TestFromBlend(unittest.TestCase):
def testMixColors(self):
self.assertEqual(
- DeviceSettings(self.graph, [(L1, ZOOM, '#503000')]),
+ DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#503000'))]),
DeviceSettings.fromBlend(self.graph, [
- (.25, DeviceSettings(self.graph, [(L1, ZOOM, '#800000')])),
- (.5, DeviceSettings(self.graph, [(L1, ZOOM, '#606000')])),
+ (.25, DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#800000'))])),
+ (.5, DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#606000'))])),
]))
diff --git a/light9/effect/simple_outputs.py b/light9/effect/simple_outputs.py
--- a/light9/effect/simple_outputs.py
+++ b/light9/effect/simple_outputs.py
@@ -21,6 +21,9 @@ class SimpleOutputs:
def updateEffectsFromGraph(self):
for effect in self.graph.subjects(RDF.type, L9['Effect']):
+ raise TypeError('change graph from Effect to EffectClass')
+
+ for effect in self.graph.subjects(RDF.type, L9['EffectClass']):
settings = []
for setting in self.graph.objects(effect, L9['setting']):
settingValues = dict(self.graph.predicate_objects(setting))
diff --git a/light9/newtypes.py b/light9/newtypes.py
--- a/light9/newtypes.py
+++ b/light9/newtypes.py
@@ -1,3 +1,4 @@
+import decimal
from typing import Tuple, NewType, Type, TypeVar, Union, cast
from rdflib import URIRef
from rdflib.term import Node
@@ -11,6 +12,8 @@ DeviceClass = NewType('DeviceClass', URI
DmxIndex = NewType('DmxIndex', int) # 1..512
DmxMessageIndex = NewType('DmxMessageIndex', int) # 0..511
DeviceAttr = NewType('DeviceAttr', URIRef) # e.g. :rx
+EffectClass = NewType('EffectClass', URIRef) # e.g. effect:chase
+EffectAttr = NewType('EffectAttr', URIRef) # e.g. :chaseSpeed
NoteUri = NewType('NoteUri', URIRef)
OutputAttr = NewType('OutputAttr', URIRef) # e.g. :xFine
OutputValue = NewType('OutputValue', int) # byte in dmx message
@@ -45,5 +48,6 @@ def typedValue(objType: Type[_ObjType],
if obj is None:
raise ValueError()
conv = obj if _isSubclass2(objType, Node) else obj.toPython()
- # may need to turn Decimal to float here
+ if objType is float and isinstance(conv, decimal.Decimal):
+ conv = float(conv)
return cast(objType, conv)
\ No newline at end of file