WIP redoing how Note works. The new Note outputs EffectSettings only,
and callers have to effect_eval them. Still not sure what SimpleOutputs does.
from rdflib import Literal, URIRef, Namespace
from light9.namespaces import L9, DEV
from webcolors import rgb_to_hex, hex_to_rgb
from dataclasses import dataclass
import logging
import math
import random
from colorsys import hsv_to_rgb
import math
from typing import Any, Dict, Tuple
from light9.effect.simple_outputs import SimpleOutputs
from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, EffectClass, VTUnion

from noise import pnoise1
import logging
from light9.effect.settings import DeviceSettings
from PIL import Image
from rdflib import Literal, Namespace, URIRef
from webcolors import hex_to_rgb, rgb_to_hex

from light9.effect.scale import scale
from typing import Dict, Tuple, Any
from PIL import Image
import random
from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
from light9.namespaces import DEV, L9
from rdfdb.syncedgraph.syncedgraph import SyncedGraph

SKY = Namespace('')


log = logging.getLogger('effecteval')"reload effecteval")


def literalColor(rnorm, gnorm, bnorm):
    return Literal(rgb_to_hex((
        int(rnorm * 255),  #
        int(gnorm * 255),  # 
        int(bnorm * 255))))


def literalColorHsv(h, s, v):
    return literalColor(*hsv_to_rgb(h, s, v))


def nsin(x):
    return (math.sin(x * (2 * math.pi)) + 1) / 2


def lerp(a, b, t):
    return a + (b - a) * t


def noise(t):
    return pnoise1(t % 1000.0, 2)


def clamp(lo, hi, x):
    return max(lo, min(hi, x))


def clamp255(x):
    return min(255, max(0, x))


def _8bit(f):
    if not isinstance(f, (int, float)):
        raise TypeError(repr(f))
    return clamp255(int(f * 255))


class EffectEval:
    runs one effect's code to turn effect attr settings into output
    device settings. No state; suitable for reload().
    graph: SyncedGraph
    effect: EffectClass
    simpleOutputs: SimpleOutputs

    def __init__(self, graph, effect, simpleOutputs):
        self.graph = graph
        self.effect = effect
        self.simpleOutputs = simpleOutputs

    def outputFromEffect(self, effectSettings, songTime, noteTime):
    def outputFromEffect(self, effectSettings: BareEffectSettings, songTime: float, noteTime: float) -> Tuple[EffectSettings, Dict]:
        From effect attr settings, like strength=0.75, to output device
        settings like light1/bright=0.72;light2/bright=0.78. This runs
        the effect code.
        # both callers need to apply note overrides
        effectSettings = dict(effectSettings)  # we should make everything into nice float and Color objects too

        strength = float(effectSettings[L9['strength']])
        print(f'outputFromEffect({effectSettings=}, {songTime=}, {noteTime=})')
        strength = float(effectSettings.s[EffectAttr(L9['strength'])])
        if strength <= 0:
            return DeviceSettings(self.graph, []), {'zero': True}
            return EffectSettings(self.graph, []), {'zero': True}

        report = {}
        out: Dict[Tuple[URIRef, URIRef], Any] = {}  # (dev, attr): value
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}  # (dev, attr): value

        out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.get(L9['colorScale'], None)))
        out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.s.get(EffectAttr(L9['colorScale']), None)))

        if self.effect.startswith(L9['effect/']):
            tail = 'effect_' + self.effect[len(L9['effect/']):]
                func = globals()[tail]
            except KeyError:
                report['error'] = 'effect code not found for %s' % self.effect
                out.update(func(effectSettings, strength, songTime, noteTime))

        outList = [(d, a, v) for (d, a), v in out.items()]
        return DeviceSettings(self.graph, outList), report
        return EffectSettings(self.graph, outList), report


def effect_Curtain(effectSettings, strength, songTime, noteTime):
    return {(L9['device/lowPattern%s' % n], L9['color']): literalColor(strength, strength, strength) for n in range(301, 308 + 1)}


def effect_animRainbow(effectSettings, strength, songTime, noteTime):
    out = {}
    tint = effectSettings.get(L9['tint'], '#ffffff')
    tintStrength = float(effectSettings.get(L9['tintStrength'], 0))
    tr, tg, tb = hex_to_rgb(tint)
    for n in range(1, 5 + 1):
        scl = strength * nsin(songTime + n * .3)**3
        col = literalColor(scl * lerp(nsin(songTime + n * .2), tr / 255, tintStrength), scl * lerp(nsin(songTime + n * .2 + .3), tg / 255, tintStrength),
                           scl * lerp(nsin(songTime + n * .3 + .6), tb / 255, tintStrength))

        dev = L9['device/aura%s' % n]
            (dev, L9['color']): col,
            (dev, L9['zoom']): .9,
        ang = songTime * 4
            (dev, L9['rx']): lerp(.27, .7, (n - 1) / 4) + .2 * math.sin(ang + n),
import asyncio
import logging
import time
from typing import Callable, Coroutine, List, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9.effect import effecteval
from light9.effect.sequencer import Note
from light9.effect.settings import DeviceSettings
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
from light9.newtypes import NoteUri
from light9.newtypes import NoteUri, UnixTime

log = logging.getLogger('sequencer')
log = logging.getLogger('seq.fader')

class FaderEval:
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
    The current faders become Notes in here, for more code reuse.
    def __init__(self,
                 graph: SyncedGraph,
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
        self.graph = graph
        self.sendToCollector = sendToCollector

        # Notes without times- always on
        self.notes: List[Note] = []

        self.simpleOutputs = SimpleOutputs(self.graph)
        self.lastLoopSucceeded = False

        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
'startupdating task')
        # have caller do this

    async def startUpdating(self):
        await self.graph.addAsyncHandler(self.update)
'startupdating task done')

    def onCodeChange(self):

    def compileGraph(self) -> None:
        """rebuild our data from the graph"""
        self.notes = []
        for fader in self.graph.subjects(RDF.type, L9['Fader']):          
            def compileFader() -> Note:
                return self.compileFader(cast(URIRef, fader))

        if self.notes:
        # if self.notes:
        #     asyncio.create_task(self.startUpdating())


    def compileFader(self, fader: URIRef) -> Note:
        return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
                self.simpleOutputs, timed=False)
        return Note(self.graph, cast(NoteUri, fader), 
    def _computeOutput(self) -> DeviceSettings:
        notesSettings = []
        now = UnixTime(time.time())
        for note in self.notes:
            effectSettings, report = note.outputCurrent()

            ee = effecteval.EffectEval(self.graph, note.effectClass, self.simpleOutputs)
            deviceSettings, report = ee.outputFromEffect(
                songTime=now, # probably wrong
                noteTime=now, # wrong
        return DeviceSettings.merge(self.graph, notesSettings)


    async def update(self):
        settings = []
        for note in self.notes:
            effectValue = self.graph.value(note.uri, L9['value'])
            if effectValue is None:
      'skip note {note}, no :value')
            s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
        devSettings = DeviceSettings.fromList(self.graph, settings)
'update {len(self.notes)=}')
        devSettings = self._computeOutput()
        with metrics('update_s3_send_fader').time():  # our measurement
            sendSecs = await self.sendToCollector(devSettings)
from unittest import mock
from light9.effect.sequencer.eval_faders import FaderEval
from light9.effect.settings import DeviceSettings
from light9.mock_syncedgraph import MockSyncedGraph
from light9.namespaces import L9


@prefix : <> .
@prefix effect: <> .
@prefix rdfs: <> .
@prefix show: <> .
@prefix xsd: <> .
@prefix dev: <> .
@prefix dmxA: <> .

                a :DeviceAttr; 
                rdfs:label "brightness"; 
                :dataType :scalar .

                a :EffectAttr;
                rdfs:label "strength" .

                a :DeviceClass; 
                rdfs:label "SimpleDimmer";
                :deviceAttr :brightness;
                :attr [ :outputAttr :level; :dmxOffset 0 ] .
                a :SimpleDimmer;
                :dmxUniverse dmxA:;
                :dmxBase 178 .

                a :EffectClass;
                :setting effect:effect1_set1 .
                :device :light1; 
                :deviceAttr :brightness; 
                :scaledValue 0.5 .
                a :Fader; 
                :effectClass effect:effect1; 
                :effectAttr :strength; 
                :value 0.6 .


class TestFaderEval:
    def test_faderValueScalesEffectSettings(self):
        g = MockSyncedGraph(NOTE_GRAPH)
        sender = mock.MagicMock()
        f = FaderEval(g, sender)
        devSettings = f._computeOutput()
        assert devSettings == DeviceSettings(g, [(L9['light1'], L9['brightness'], 0.3)])
\ No newline at end of file
import bisect
from dataclasses import dataclass
import logging
import time
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from light9.effect.effecteval import EffectEval
from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
from light9.effect.simple_outputs import SimpleOutputs

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef

from light9.namespaces import L9
from light9.newtypes import Curve, DeviceAttr, DeviceUri, NoteUri, typedValue
from light9.newtypes import Curve, DeviceAttr, EffectAttr, EffectClass, NoteUri, VTUnion, typedValue

log = logging.getLogger('sequencer')


def pyType(n):
    ret = n.toPython()
    if isinstance(ret, Decimal):
        return float(ret)
    return ret


def prettyFormat(x: Union[float, str]):
    if isinstance(x, float):
        return round(x, 4)
    return x


class Note:
    """A Note outputs EffectAttr settings.
    Sample graph:
     :note1 a :Note; :curve :n1c1; :effectClass effect:allcolor;

    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs, timed=True):
        g = self.graph = graph
        self.uri = uri
        self.timed = timed
        self.effectEval = effectevalModule.EffectEval(graph, g.value(uri, L9['effectClass']), simpleOutputs)
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
        for s in g.objects(uri, L9['setting']):
            settingValues = dict(g.predicate_objects(s))
            ea = cast(URIRef, settingValues[L9['effectAttr']])
            self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])
    It can animate the EffectAttr settings over time, in two ways:
     * a `timed note` has an envelope curve that animates 
       the :strength EffectAttr over time
     * an `untimed note` has no curve, a fixed strength, but 
       still passes the wall clock time to its effect so the 
       effect can include animation. A `Fader` is an untimed note.
    This obj is immutable, I think, but the graph can change, 
    which can affect the output. However, I think this doesn't 
    do its own rebuilds, and it's up to the caller to addHandler 
    around the creation of Note objects.
    graph: SyncedGraph
    uri: NoteUri
    # simpleOutputs: SimpleOutputs
    timed: bool = True

        if timed:
    def __post_init__(self):
        ec = self.graph.value(self.uri, L9['effectClass'])
        if ec is None:
            raise ValueError(f'note {self.uri} has no :effectClass')
        self.effectClass = EffectClass(ec)

            def floatVal(s, p):
                return typedValue(float, g, s, p)
        self.baseEffectSettings = self.getBaseEffectSettings()

            originTime = floatVal(uri, L9['originTime'])
        if self.timed:
            originTime = typedValue(float, self.graph, self.uri, L9['originTime'])
            self.points: List[Tuple[float, float]] = []
            for curve in g.objects(uri, L9['curve']):
            for curve in self.graph.objects(self.uri, L9['curve']):
                self.points.extend(self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
            self.points = []

    def getBaseEffectSettings(self) -> BareEffectSettings:
        """i think these are settings that are fixed over time, 
        e.g. that you set in the note's body in the timeline editor
        out: Dict[EffectAttr, VTUnion] = {}
        for s in self.graph.objects(self.uri, L9['setting']):
            settingValues = dict(self.graph.predicate_objects(s))
            ea = cast(EffectAttr, settingValues[L9['effectAttr']])
            out[ea] = pyType(settingValues[L9['value']])
        return BareEffectSettings(s=out)

    def getCurvePoints(self, curve: Curve, attr, originTime: float) -> List[Tuple[float, float]]:
        points = []
        po = list(self.graph.predicate_objects(curve))
        if dict(po).get(L9['attr'], None) != attr:
            return []
        for point in [row[1] for row in po if row[0] == L9['point']]:
            po2 = dict(self.graph.predicate_objects(point))
            t = cast(Literal, po2[L9['time']]).toPython()
            if not isinstance(t, float):
                raise TypeError

            v = cast(Literal, po2[L9['value']]).toPython()
            if not isinstance(v, float):
                raise TypeError
            points.append((originTime + t, v))
        return points

    def activeAt(self, t: float) -> bool:
        return self.points[0][0] <= t <= self.points[-1][0]

    def evalCurve(self, t: float) -> float:
        i = bisect.bisect_left(self.points, (t, None)) - 1

        if i == -1:
            return self.points[0][1]
        if self.points[i][0] > t:
            return self.points[i][1]
        if i >= len(self.points) - 1:
            return self.points[i][1]

        p1, p2 = self.points[i], self.points[i + 1]
        frac = (t - p1[0]) / (p2[0] - p1[0])
        y = p1[1] + (p2[1] - p1[1]) * frac
        return y

    def outputSettings(self, t: float, strength: Optional[float] = None) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
        list of (device, attr, value), and a report for web
    def outputCurrent(self):      
        st = typedValue(float, self.graph, self.uri, L9['value'])
        return self._outputSettings(t=None, strength=st)

    def _outputSettings(
            t: float | None,
            strength: Optional[float] = None  #
    ) -> Tuple[BareEffectSettings, Dict]:
        if t is None:
            if self.timed:
                raise TypeError()
            t = time.time()  # so live effects will move
        report = {
        report:Dict[str,Any] = {
            'note': str(self.uri),
            'effectClass': self.effectEval.effect,
            'effectClass': str(self.effectClass),

        strengthAttr = cast(DeviceAttr, L9['strength'])

        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict((DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
        effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength
        s = self.evalCurve(t) if strength is None else strength
        out = self.baseEffectSettings.withStrength(s)
        report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(out.s.items()))
        report['nonZero'] = s > 0

        def prettyFormat(x: Union[float, str]):
            if isinstance(x, float):
                return round(x, 4)
            return x
        return out, report

        report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
        report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
        # old api had this

        startTime = self.points[0][0] if self.timed else 0
        out, evalReport = self.effectEval.outputFromEffect(
            # note: not using origin here since it's going away
            noteTime=t - startTime)
        report['devicesAffected'] = len(out.devices())
        return out, report
Show inline comments
import pytest

from light9.effect.sequencer import Note
from light9.effect.settings import BareEffectSettings
from light9.mock_syncedgraph import MockSyncedGraph
from light9.namespaces import L9
from light9.newtypes import EffectAttr, NoteUri

@prefix : <> .
@prefix effect: <> .
@prefix rdfs: <> .
@prefix show: <> .
@prefix xsd: <> .
@prefix dev: <> .
@prefix dmxA: <> .

        a :Fader; 
        :effectClass effect:effect1; 
        :effectAttr :strength; 
        :value 0.6 .


class TestUntimedFaderNote:

    def test_returnsEffectSettings(self):
        g = MockSyncedGraph(FADER_GRAPH)
        n = Note(g, NoteUri(L9['fade1']), timed=False)
        out, report = n.outputCurrent()
        assert report['effectSettings'] == {'': 0.6}
        assert out == BareEffectSettings(s={EffectAttr(L9['strength']): 0.6})


                a :DeviceAttr; 
                rdfs:label "brightness"; 
                :dataType :scalar .

                a :EffectAttr;
                rdfs:label "strength" .

                a :DeviceClass; 
                rdfs:label "SimpleDimmer";
                :deviceAttr :brightness;
                :attr [ :outputAttr :level; :dmxOffset 0 ] .
                a :SimpleDimmer;
                :dmxUniverse dmxA:;
                :dmxBase 178 .

                a :EffectClass;
                :setting effect:effect1_set1 .
                :device dev:light1; 
                :deviceAttr :brightness; 
                :scaledValue 0.5 .
                a :Fader; 
                :effectClass effect:effect1; 
                :effectAttr :strength; 
                :value 0.6 .



class TestTimedNote:

    def test_scalesStrengthWithCurve(self):
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)

    def onCodeChange(self):

    def compileGraph(self) -> None:
        """rebuild our data from the graph"""
        for song in self.graph.subjects(RDF.type, L9['Song']):

            def compileSong(song: Song = cast(Song, song)) -> None:


    def compileSong(self, song: Song) -> None:
        anyErrors = False
        self.notes[song] = []
        for note in self.graph.objects(song, L9['note']):
                n = Note(self.graph, NoteUri(cast(NoteUri, note)), effecteval,
                n = Note(self.graph, NoteUri(cast(NoteUri, note)))
            except Exception:
                log.warn(f"failed to build Note {note} - skipping")
                anyErrors = True
        if not anyErrors:
  'built all notes for {song}')

    async def updateLoop(self):
        while True:
            frameStart = time.time()
                sec = await self.update()
            except Exception as e:
                self.lastLoopSucceeded = False
                log.warn('updateLoop: %r', e)
                await asyncio.sleep(1)
                took = time.time() - frameStart

                if not self.lastLoopSucceeded:
Data structure and convertors for a table of (device,attr,value)
rows. These might be effect attrs ('strength'), device attrs ('rx'),
or output attrs (dmx channel).

BareSettings means (attr,value), no device.
from __future__ import annotations

import decimal
import logging
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple, Union, cast
from light9.localsyncedgraph import LocalSyncedGraph

import numpy
from rdflib import URIRef, Literal
from light9.namespaces import RDF, L9
import logging
from rdflib import Literal, URIRef

from light9.collector.device import resolve
from light9.namespaces import L9, RDF
from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion
from rdfdb.syncedgraph.syncedgraph import SyncedGraph

log = logging.getLogger('settings')
from light9.collector.device import resolve
from typing import Sequence, Dict, Union, List


def parseHex(h):
    if h[0] != '#': raise ValueError(h)
    if h[0] != '#':
        raise ValueError(h)
    return [int(h[i:i + 2], 16) for i in (1, 3, 5)]


def parseHexNorm(h):
    return [x / 255 for x in parseHex(h)]


def toHex(rgbFloat: Sequence[float]) -> str:
def toHex(rgbFloat: Sequence[float]) -> HexColor:
    assert len(rgbFloat) == 3
    scaled = (max(0, min(255, int(v * 255))) for v in rgbFloat)
    return '#%02x%02x%02x' % tuple(scaled)  # type: ignore
    return HexColor('#%02x%02x%02x' % tuple(scaled))


def getVal(graph, subj):
    lit = graph.value(subj, L9['value']) or graph.value(subj, L9['scaledValue'])
    ret = lit.toPython()
    if isinstance(ret, decimal.Decimal):
        ret = float(ret)
    return ret


GraphType = SyncedGraph | LocalSyncedGraph


class _Settings:
    Generic for DeviceUri/DeviceAttr/VTUnion or EffectClass/EffectAttr/VTUnion

    default values are 0 or '#000000'. Internal rep must not store zeros or some
    comparisons will break.
    EntityType = DeviceUri
    AttrType = DeviceAttr

    def __init__(self, graph, settingsList):
    def __init__(self, graph: GraphType, settingsList: List[Tuple[Any, Any, VTUnion]]):
        self.graph = graph  # for looking up all possible attrs
        self._compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {
        }  # dev: { attr: val }; val is number or colorhex
        self._compiled: Dict[self.__class__.EntityType, Dict[self.__class__.AttrType, VTUnion]] = {}
        for row in settingsList:
            self._compiled.setdefault(row[0], {})[row[1]] = row[2]
        # self._compiled may not be final yet- see _fromCompiled

    def _fromCompiled(cls, graph, compiled):
    def _fromCompiled(cls, graph: GraphType, compiled: Dict[EntityType, Dict[AttrType, VTUnion]]):
        obj = cls(graph, [])
        obj._compiled = compiled
        return obj

    def fromResource(cls, graph, subj):
        settingsList = []
        with graph.currentState() as g:
            for s in g.objects(subj, L9['setting']):
                d = g.value(s, L9['device'])
                da = g.value(s, L9['deviceAttr'])
                v = getVal(g, s)
                settingsList.append((d, da, v))
        return cls(graph, settingsList)

    def fromVector(cls, graph, vector, deviceAttrFilter=None):
        compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}
        i = 0
        for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
            if a == L9['color']:
                v = toHex(vector[i:i + 3])
                i += 3
                v = vector[i]
                i += 1
            compiled.setdefault(d, {})[a] = v
        return cls._fromCompiled(graph, compiled)

    def fromList(cls, graph, others):
    def fromList(cls, graph: GraphType, others: List[_Settings]):
        """note that others may have multiple values for an attr"""
        out = cls(graph, [])
        self = cls(graph, [])
        for s in others:
            if not isinstance(s, cls):
                raise TypeError(s)
            # if not isinstance(s, cls):
            #     raise TypeError(s)
            for row in s.asList():  # could work straight from s._compiled
                if row[0] is None:
                    raise TypeError('bad row %r' % (row,))
                dev, devAttr, value = row
                devDict = out._compiled.setdefault(dev, {})
                devDict = self._compiled.setdefault(dev, {})
                if devAttr in devDict:
                    value = resolve(dev, devAttr, [devDict[devAttr], value])
                    existingVal: VTUnion = devDict[devAttr]
                    raise NotImplementedError('fixme: dev is to be a deviceclass (but it is currently unused)')
                    value = resolve(dev, devAttr, [existingVal, value])
                devDict[devAttr] = value
        return out
        return self

    def fromBlend(cls, graph, others):
    def _mult(cls, weight, row, dd) -> VTUnion:
        if isinstance(row[2], str):
            prev = parseHexNorm(dd.get(row[1], '#000000'))
            return toHex(prev + weight * numpy.array(parseHexNorm(row[2])))
            return dd.get(row[1], 0) + weight * row[2]

    def fromBlend(cls, graph: GraphType, others: List[Tuple[float, _Settings]]):
        """others is a list of (weight, Settings) pairs"""
        out = cls(graph, [])
        for weight, s in others:
            if not isinstance(s, cls):
                raise TypeError(s)
            for row in s.asList():  # could work straight from s._compiled
                if row[0] is None:
                    raise TypeError('bad row %r' % (row,))
                dd = out._compiled.setdefault(row[0], {})

                if isinstance(row[2], str):
                    prev = parseHexNorm(dd.get(row[1], '#000000'))
                    newVal = toHex(prev +
                                   weight * numpy.array(parseHexNorm(row[2])))
                    newVal = dd.get(row[1], 0) + weight * row[2]
                newVal = cls._mult(weight, row, dd)
                dd[row[1]] = newVal
        return out

    def _zeroForAttr(self, attr):
    def _zeroForAttr(self, attr: AttrType) -> VTUnion:
        if attr == L9['color']:
            return '#000000'
            return HexColor('#000000')
        return 0.0

    def _delZeros(self):
        for dev, av in list(self._compiled.items()):
            for attr, val in list(av.items()):
                if val == self._zeroForAttr(attr):
                    del av[attr]
            if not av:
                del self._compiled[dev]

    def __hash__(self):
        itemed = tuple([(d, tuple([(a, v)
                                   for a, v in sorted(av.items())]))
                        for d, av in sorted(self._compiled.items())])
        itemed = tuple([(d, tuple([(a, v) for a, v in sorted(av.items())])) for d, av in sorted(self._compiled.items())])
        return hash(itemed)

    def __eq__(self, other):
        if not issubclass(other.__class__, self.__class__):
            raise TypeError("can't compare %r to %r" %
                            (self.__class__, other.__class__))
            raise TypeError("can't compare %r to %r" % (self.__class__, other.__class__))
        return self._compiled == other._compiled

    def __ne__(self, other):
        return not self == other

    def __bool__(self):
        return bool(self._compiled)

    def __repr__(self):
        words = []

        def accum():
            for dev, av in self._compiled.items():
                for attr, val in sorted(av.items()):
                        '%s.%s=%s' %
                        (dev.rsplit('/')[-1], attr.rsplit('/')[-1], val))
                    words.append('%s.%s=%s' % (dev.rsplit('/')[-1], attr.rsplit('/')[-1], val))
                    if len(words) > 5:

        if not words:
            words = ['(no settings)']
        return '<%s %s>' % (self.__class__.__name__, ' '.join(words))

    def getValue(self, dev, attr):
    def getValue(self, dev: EntityType, attr: AttrType):
        return self._compiled.get(dev, {}).get(attr, self._zeroForAttr(attr))

    def _vectorKeys(self, deviceAttrFilter=None):
        """stable order of all the dev,attr pairs for this type of settings"""
        raise NotImplementedError

    def asList(self):
    def asList(self) -> List[Tuple[EntityType, AttrType, VTUnion]]:
        """old style list of (dev, attr, val) tuples"""
        out = []
        for dev, av in self._compiled.items():
            for attr, val in av.items():
                out.append((dev, attr, val))
        return out

    def devices(self):
    def devices(self) -> List[EntityType]:
        return list(self._compiled.keys())

    def toVector(self, deviceAttrFilter=None):
    def toVector(self, deviceAttrFilter=None) -> List[float]:
        out: List[float] = []
        for dev, attr in self._vectorKeys(deviceAttrFilter):
            v = self.getValue(dev, attr)
            if attr == L9['color']:
                if not isinstance(v, float):
                    raise TypeError(f'{attr=} value was {v=}')
        return out

    def byDevice(self):
    def byDevice(self) -> Iterable[Tuple[EntityType, _Settings]]:
        for dev, av in self._compiled.items():
            yield dev, self.__class__._fromCompiled(self.graph, {dev: av})

    def ofDevice(self, dev):
        return self.__class__._fromCompiled(self.graph,
                                            {dev: self._compiled.get(dev, {})})
    def ofDevice(self, dev: EntityType) -> _Settings:
        return self.__class__._fromCompiled(self.graph, {dev: self._compiled.get(dev, {})})

    def distanceTo(self, other):
        diff = numpy.array(self.toVector()) - other.toVector()
        d = numpy.linalg.norm(diff, ord=None)
'distanceTo %r - %r = %g', self, other, d)
        return d

    def statements(self, subj, ctx, settingRoot, settingsSubgraphCache):
    def statements(self, subj: EntityType, ctx: URIRef, settingRoot: URIRef, settingsSubgraphCache: Set):
        settingRoot can be shared across images (or even wider if you want)
        # ported from
        add = []
        for i, (dev, attr, val) in enumerate(self.asList()):
            # hopefully a unique number for the setting so repeated settings converge
            settingHash = hash((dev, attr, val)) % 9999999
            setting = URIRef('%sset%s' % (settingRoot, settingHash))
            add.append((subj, L9['setting'], setting, ctx))
            if setting in settingsSubgraphCache:

            scaledAttributeTypes = [L9['color'], L9['brightness'], L9['uv']]
            settingType = L9[
                'scaledValue'] if attr in scaledAttributeTypes else L9['value']
            settingType = L9['scaledValue'] if attr in scaledAttributeTypes else L9['value']
            if not isinstance(val, URIRef):
                val = Literal(val)
                (setting, L9['device'], dev, ctx),
                (setting, L9['deviceAttr'], attr, ctx),
                (setting, settingType, val, ctx),

        return add


class DeviceSettings(_Settings):
    EntityType = DeviceUri
    AttrType = DeviceAttr

    def _vectorKeys(self, deviceAttrFilter=None):
        with self.graph.currentState() as g:
            devs = set()  # devclass, dev
            for dc in g.subjects(RDF.type, L9['DeviceClass']):
                for dev in g.subjects(RDF.type, dc):
                    devs.add((dc, dev))

            keys = []
            for dc, dev in sorted(devs):
                for attr in sorted(g.objects(dc, L9['deviceAttr'])):
                    key = (dev, attr)
                    if deviceAttrFilter and key not in deviceAttrFilter:
        return keys

    def fromResource(cls, graph: GraphType, subj: EntityType):
        settingsList = []
        with graph.currentState() as g:
            for s in g.objects(subj, L9['setting']):
                d = g.value(s, L9['device'])
                da = g.value(s, L9['deviceAttr'])
                v = getVal(g, s)
                settingsList.append((d, da, v))
        return cls(graph, settingsList)

    def fromVector(cls, graph, vector, deviceAttrFilter=None):
        compiled: Dict[DeviceSettings.EntityType, Dict[DeviceSettings.AttrType, VTUnion]] = {}
        i = 0
        for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
            if a == L9['color']:
                v = toHex(vector[i:i + 3])
                i += 3
                v = vector[i]
                i += 1
            compiled.setdefault(d, {})[a] = v
        return cls._fromCompiled(graph, compiled)

    def merge(cls, graph: SyncedGraph, others: List[DeviceSettings]) -> DeviceSettings:
        return cls.fromList(graph, cast(List[_Settings], others))


class BareEffectSettings:
    # settings not specific to any effect
    s: Dict[EffectAttr, VTUnion]

    def withStrength(self, strength: float) -> BareEffectSettings:
        out = self.s.copy()
        out[EffectAttr(L9['strength'])] = strength
        return BareEffectSettings(s=out)


class EffectSettings(_Settings):
from typing import cast
import unittest
from light9.newtypes import DeviceAttr, DeviceUri, HexColor, VTUnion
from rdflib import Literal
from rdfdb.patch import Patch
from light9.localsyncedgraph import LocalSyncedGraph
from light9.namespaces import L9, DEV
from light9.effect.settings import DeviceSettings


class TestDeviceSettings(unittest.TestCase):

    def setUp(self):
        self.graph = LocalSyncedGraph(
            files=['test/cam/lightConfig.n3', 'test/cam/bg.n3'])

    def testToVectorZero(self):
        ds = DeviceSettings(self.graph, [])
        self.assertEqual([0] * 30, ds.toVector())

    def testEq(self):
        s1 = DeviceSettings(self.graph, [
            (L9['light1'], L9['attr1'], 0.5),
            (L9['light1'], L9['attr2'], 0.3),
        s2 = DeviceSettings(self.graph, [
            (L9['light1'], L9['attr2'], 0.3),
@@ -31,98 +33,98 @@ class TestDeviceSettings(unittest.TestCa
    def testMissingFieldsEqZero(self):
            DeviceSettings(self.graph, [
                (L9['aura1'], L9['rx'], 0),
            ]), DeviceSettings(self.graph, []))

    def testFalseIfZero(self):
            DeviceSettings(self.graph, [(L9['aura1'], L9['rx'], 0.1)]))
        self.assertFalse(DeviceSettings(self.graph, []))

    def testFromResource(self):
        ctx = L9['']
                (L9['foo'], L9['setting'], L9['foo_set0'], ctx),
                (L9['foo_set0'], L9['device'], L9['light1'], ctx),
                (L9['foo_set0'], L9['deviceAttr'], L9['brightness'], ctx),
                (L9['foo_set0'], L9['value'], Literal(0.1), ctx),
                (L9['foo'], L9['setting'], L9['foo_set1'], ctx),
                (L9['foo_set1'], L9['device'], L9['light1'], ctx),
                (L9['foo_set1'], L9['deviceAttr'], L9['speed'], ctx),
                (L9['foo_set1'], L9['scaledValue'], Literal(0.2), ctx),
        s = DeviceSettings.fromResource(self.graph, L9['foo'])
        s = DeviceSettings.fromResource(self.graph, DeviceUri(L9['foo']))

            DeviceSettings(self.graph, [
                (L9['light1'], L9['brightness'], 0.1),
                (L9['light1'], L9['speed'], 0.2),
            ]), s)

    def testToVector(self):
        v = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0.5),
            (DEV['aura1'], L9['color'], '#00ff00'),
            (DeviceUri(DEV['aura1']), DeviceAttr(L9['rx']), 0.5),
            (DeviceUri(DEV['aura1']), DeviceAttr(L9['color']), HexColor('#00ff00')),
            0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 0
        ], v)

    def testFromVector(self):
        s = DeviceSettings.fromVector(self.graph, [
            0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 0

            DeviceSettings(self.graph, [
                (DEV['aura1'], L9['rx'], 0.5),
                (DEV['aura1'], L9['color'], '#00ff00'),
                (DeviceUri(DEV['aura1']), DeviceAttr(L9['rx']), 0.5),
                (DeviceUri(DEV['aura1']), DeviceAttr(L9['color']), HexColor('#00ff00')),
            ]), s)

    def testAsList(self):
        sets = [
            (L9['light1'], L9['attr2'], 0.3),
            (L9['light1'], L9['attr1'], 0.5),
            (DeviceUri(L9['light1']), DeviceAttr(L9['attr2']), cast(VTUnion,0.3)),
            (DeviceUri(L9['light1']), DeviceAttr(L9['attr1']), 0.5),
        self.assertCountEqual(sets, DeviceSettings(self.graph, sets).asList())

    def testDevices(self):
        s = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0),
            (DEV['aura2'], L9['rx'], 0.1),
        # aura1 is all defaults (zeros), so it doesn't get listed
        self.assertCountEqual([DEV['aura2']], s.devices())

    def testAddStatements(self):
        s = DeviceSettings(self.graph, [
            (DEV['aura2'], L9['rx'], 0.1),
        stmts = s.statements(L9['foo'], L9['ctx1'], L9['s_'], set())
        stmts = s.statements(DeviceUri(L9['foo']), L9['ctx1'], L9['s_'], set())
        self.maxDiff = None
        setting = sorted(stmts)[-1][0]
            (L9['foo'], L9['setting'], setting, L9['ctx1']),
            (setting, L9['device'], DEV['aura2'], L9['ctx1']),
            (setting, L9['deviceAttr'], L9['rx'], L9['ctx1']),
            (setting, L9['value'], Literal(0.1), L9['ctx1']),
        ], stmts)

    def testDistanceTo(self):
        s1 = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0.1),
            (DEV['aura1'], L9['ry'], 0.6),
        s2 = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0.3),
            (DEV['aura1'], L9['ry'], 0.3),
        self.assertEqual(0.36055512754639896, s1.distanceTo(s2))


L1 = L9['light1']
ZOOM = L9['zoom']

@@ -136,29 +138,29 @@ class TestFromBlend(unittest.TestCase):
    def testSingle(self):
            DeviceSettings(self.graph, [(L1, ZOOM, 0.5)]),
                [(1, DeviceSettings(self.graph, [(L1, ZOOM, 0.5)]))]))

    def testScale(self):
            DeviceSettings(self.graph, [(L1, ZOOM, 0.1)]),
                [(.2, DeviceSettings(self.graph, [(L1, ZOOM, 0.5)]))]))

    def testMixFloats(self):
            DeviceSettings(self.graph, [(L1, ZOOM, 0.4)]),
            DeviceSettings.fromBlend(self.graph, [
                (.2, DeviceSettings(self.graph, [(L1, ZOOM, 0.5)])),
                (.3, DeviceSettings(self.graph, [(L1, ZOOM, 1.0)])),

    def testMixColors(self):
            DeviceSettings(self.graph, [(L1, ZOOM, '#503000')]),
            DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#503000'))]),
            DeviceSettings.fromBlend(self.graph, [
                (.25, DeviceSettings(self.graph, [(L1, ZOOM, '#800000')])),
                (.5, DeviceSettings(self.graph, [(L1, ZOOM, '#606000')])),
                (.25, DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#800000'))])),
                (.5, DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#606000'))])),
import traceback
from light9.namespaces import L9, RDF
from light9.effect.scale import scale
from typing import Dict, List, Tuple, Any
from rdflib import URIRef


class SimpleOutputs:
    Watches graph for effects that are just fading output attrs. 
    Call `values` to get (dev,attr):value settings.

    def __init__(self, graph):
        self.graph = graph

        # effect : [(dev, attr, value, isScaled)]
        self.effectOutputs: Dict[URIRef, List[Tuple[URIRef, URIRef, Any, bool]]] = {}


    def updateEffectsFromGraph(self):
        for effect in self.graph.subjects(RDF.type, L9['Effect']):
            raise TypeError('change graph from Effect to EffectClass')

        for effect in self.graph.subjects(RDF.type, L9['EffectClass']):
            settings = []
            for setting in self.graph.objects(effect, L9['setting']):
                settingValues = dict(self.graph.predicate_objects(setting))
                    d = settingValues.get(L9['device'], None)
                    a = settingValues.get(L9['deviceAttr'], None)
                    v = settingValues.get(L9['value'], None)
                    sv = settingValues.get(L9['scaledValue'], None)
                    if not (bool(v) ^ bool(sv)):
                        raise NotImplementedError('no value for %s' % setting)
                    if d is None:
                        raise TypeError('no device on %s' % effect)
                    if a is None:
                        raise TypeError('no attr on %s' % effect)
                except Exception:

                settings.append((d, a, v if v is not None else sv, bool(sv)))

            if settings:
                self.effectOutputs[effect] = settings
            # also have to read eff :effectAttr [ :tint x; :tintStrength y ]

Show inline comments
import decimal
from typing import Tuple, NewType, Type, TypeVar, Union, cast
from rdflib import URIRef
from rdflib.term import Node

ClientType = NewType('ClientType', str)
ClientSessionType = NewType('ClientSessionType', str)
Curve = NewType('Curve', URIRef)
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
DmxIndex = NewType('DmxIndex', int)  # 1..512
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
EffectClass = NewType('EffectClass', URIRef) # e.g. effect:chase
EffectAttr = NewType('EffectAttr', URIRef)  # e.g. :chaseSpeed
NoteUri = NewType('NoteUri', URIRef)
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
OutputValue = NewType('OutputValue', int)  # byte in dmx message
Song = NewType('Song', URIRef)
UnixTime = NewType('UnixTime', float)

VT = TypeVar('VT', float, int, str)  # remove
HexColor = NewType('HexColor', str)
VTUnion = Union[float, int, HexColor]  # rename to ValueType
DeviceSetting = Tuple[DeviceUri, DeviceAttr,
                      # currently, floats and hex color strings

# Alternate output range for a device. Instead of outputting 0.0 to
# 1.0, you can map that range into, say, 0.2 to 0.7
OutputRange = NewType('OutputRange', Tuple[float, float])

_ObjType = TypeVar('_ObjType')


def _isSubclass2(t1: Type, t2: Type) -> bool:
    """same as issubclass but t1 can be a NewType"""
    if hasattr(t1, '__supertype__'):
        t1 = t1.__supertype__
    return issubclass(t1, t2)


def typedValue(objType: Type[_ObjType], graph, subj, pred) -> _ObjType:
    """graph.value(subj, pred) with a given return type. 
    If objType is not an rdflib.Node, we toPython() the value."""
    obj = graph.value(subj, pred)
    if obj is None:
        raise ValueError()
    conv = obj if _isSubclass2(objType, Node) else obj.toPython()
    # may need to turn Decimal to float here
    if objType is float and isinstance(conv, decimal.Decimal):
        conv = float(conv)
    return cast(objType, conv)
\ No newline at end of file
