Changeset - 9fc653ee7fff
[Not reviewed]
default
0 8 2
drewp@bigasterisk.com - 20 months ago 2023-05-20 21:49:15
drewp@bigasterisk.com
WIP redoing how Note works. The new Note outputs EffectSettings only,
and callers have to effect_eval them. Still not sure what SimpleOutputs does.
10 files changed with 401 insertions and 160 deletions:
0 comments (0 inline, 0 general)
light9/effect/effecteval.py
Show inline comments
 
from rdflib import Literal, URIRef, Namespace
 
from light9.namespaces import L9, DEV
 
from webcolors import rgb_to_hex, hex_to_rgb
 
from dataclasses import dataclass
 
import logging
 
import math
 
import random
 
from colorsys import hsv_to_rgb
 
import math
 
from typing import Any, Dict, Tuple
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, EffectClass, VTUnion
 

	
 
from noise import pnoise1
 
import logging
 
from light9.effect.settings import DeviceSettings
 
from PIL import Image
 
from rdflib import Literal, Namespace, URIRef
 
from webcolors import hex_to_rgb, rgb_to_hex
 

	
 
from light9.effect.scale import scale
 
from typing import Dict, Tuple, Any
 
from PIL import Image
 
import random
 
from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
 
from light9.namespaces import DEV, L9
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 

	
 
SKY = Namespace('http://light9.bigasterisk.com/theater/skyline/device/')
 

	
 
random.seed(0)
 

	
 
log = logging.getLogger('effecteval')
 
@@ -61,52 +67,51 @@ def clamp255(x):
 
def _8bit(f):
 
    if not isinstance(f, (int, float)):
 
        raise TypeError(repr(f))
 
    return clamp255(int(f * 255))
 

	
 

	
 
@dataclass
 
class EffectEval:
 
    """
 
    runs one effect's code to turn effect attr settings into output
 
    device settings. No state; suitable for reload().
 
    """
 
    graph: SyncedGraph
 
    effect: EffectClass
 
    simpleOutputs: SimpleOutputs
 

	
 
    def __init__(self, graph, effect, simpleOutputs):
 
        self.graph = graph
 
        self.effect = effect
 
        self.simpleOutputs = simpleOutputs
 

	
 
    def outputFromEffect(self, effectSettings, songTime, noteTime):
 
    def outputFromEffect(self, effectSettings: BareEffectSettings, songTime: float, noteTime: float) -> Tuple[EffectSettings, Dict]:
 
        """
 
        From effect attr settings, like strength=0.75, to output device
 
        settings like light1/bright=0.72;light2/bright=0.78. This runs
 
        the effect code.
 
        """
 
        # both callers need to apply note overrides
 
        effectSettings = dict(effectSettings)  # we should make everything into nice float and Color objects too
 

	
 
        strength = float(effectSettings[L9['strength']])
 
        print(f'outputFromEffect({effectSettings=}, {songTime=}, {noteTime=})')
 
        strength = float(effectSettings.s[EffectAttr(L9['strength'])])
 
        if strength <= 0:
 
            return DeviceSettings(self.graph, []), {'zero': True}
 
            return EffectSettings(self.graph, []), {'zero': True}
 

	
 
        report = {}
 
        out: Dict[Tuple[URIRef, URIRef], Any] = {}  # (dev, attr): value
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}  # (dev, attr): value
 

	
 
        out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.get(L9['colorScale'], None)))
 
        out.update(self.simpleOutputs.values(self.effect, strength, effectSettings.s.get(EffectAttr(L9['colorScale']), None)))
 

	
 
        if self.effect.startswith(L9['effect/']):
 
            tail = 'effect_' + self.effect[len(L9['effect/']):]
 
            try:
 
                func = globals()[tail]
 
            except KeyError:
 
                report['error'] = 'effect code not found for %s' % self.effect
 
            else:
 
                out.update(func(effectSettings, strength, songTime, noteTime))
 

	
 
        outList = [(d, a, v) for (d, a), v in out.items()]
 
        return DeviceSettings(self.graph, outList), report
 
        print(f'{outList=}')
 
        return EffectSettings(self.graph, outList), report
 

	
 

	
 
def effect_Curtain(effectSettings, strength, songTime, noteTime):
 
    return {(L9['device/lowPattern%s' % n], L9['color']): literalColor(strength, strength, strength) for n in range(301, 308 + 1)}
 

	
 

	
light9/effect/sequencer/eval_faders.py
Show inline comments
 

	
 
import asyncio
 
import logging
 
import time
 
from typing import Callable, Coroutine, List, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
@@ -10,15 +9,15 @@ from rdflib import URIRef
 
from light9.effect import effecteval
 
from light9.effect.sequencer import Note
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.metrics import metrics
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import NoteUri
 
from light9.newtypes import NoteUri, UnixTime
 

	
 
log = logging.getLogger('sequencer')
 
log = logging.getLogger('seq.fader')
 

	
 
class FaderEval:
 
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
 
    
 
    The current faders become Notes in here, for more code reuse.
 
    """
 
@@ -34,14 +33,14 @@ class FaderEval:
 

	
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        log.info('startupdating task')
 
        asyncio.create_task(self.startUpdating())
 
        # have caller do this
 
        #asyncio.create_task(self.startUpdating())
 

	
 
    async def startUpdating(self):
 
        await self.graph.addAsyncHandler(self.update)
 
        log.info('startupdating task done')
 

	
 
    def onCodeChange(self):
 
@@ -55,28 +54,37 @@ class FaderEval:
 
        self.notes = []
 
        for fader in self.graph.subjects(RDF.type, L9['Fader']):          
 
            def compileFader() -> Note:
 
                return self.compileFader(cast(URIRef, fader))
 

	
 
            self.notes.append(compileFader())
 
        if self.notes:
 
            asyncio.create_task(self.startUpdating())
 
        # if self.notes:
 
        #     asyncio.create_task(self.startUpdating())
 

	
 

	
 
    @metrics('compile_fader').time()
 
    def compileFader(self, fader: URIRef) -> Note:
 
        return Note(self.graph, NoteUri(cast(NoteUri, fader)), effecteval,
 
                self.simpleOutputs, timed=False)
 
        return Note(self.graph, cast(NoteUri, fader), 
 
                    timed=False)
 
    
 
    def _computeOutput(self) -> DeviceSettings:
 
        notesSettings = []
 
        now = UnixTime(time.time())
 
        for note in self.notes:
 
            effectSettings, report = note.outputCurrent()
 

	
 
            ee = effecteval.EffectEval(self.graph, note.effectClass, self.simpleOutputs)
 
            deviceSettings, report = ee.outputFromEffect(
 
                effectSettings, 
 
                songTime=now, # probably wrong
 
                noteTime=now, # wrong
 
                )
 
            notesSettings.append(deviceSettings)
 
        return DeviceSettings.merge(self.graph, notesSettings)
 

	
 
    
 
    @metrics('update_call_fader').time()
 
    async def update(self):
 
        settings = []
 
        for note in self.notes:
 
            effectValue = self.graph.value(note.uri, L9['value'])
 
            if effectValue is None:
 
                log.info(f'skip note {note}, no :value')
 
                continue
 
            s, report = note.outputSettings(t=time.time(), strength=float(effectValue))
 
            settings.append(s)
 
        devSettings = DeviceSettings.fromList(self.graph, settings)
 
        log.info(f'update {len(self.notes)=}')
 
        devSettings = self._computeOutput()
 
        with metrics('update_s3_send_fader').time():  # our measurement
 
            sendSecs = await self.sendToCollector(devSettings)
light9/effect/sequencer/eval_faders_test.py
Show inline comments
 
new file 100644
 
from unittest import mock
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.settings import DeviceSettings
 
from light9.mock_syncedgraph import MockSyncedGraph
 
from light9.namespaces import L9
 

	
 

	
 
PREFIXES = '''
 
@prefix : <http://light9.bigasterisk.com/> .
 
@prefix effect: <http://light9.bigasterisk.com/effect/> .
 
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
 
@prefix show: <http://light9.bigasterisk.com/show/dance2023/> .
 
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
 
@prefix dev: <http://light9.bigasterisk.com/theater/test/device/> .
 
@prefix dmxA: <http://light9.bigasterisk.com/output/dmxA/> .
 
'''
 

	
 
NOTE_GRAPH = PREFIXES + '''
 
            :brightness         
 
                a :DeviceAttr; 
 
                rdfs:label "brightness"; 
 
                :dataType :scalar .
 

	
 
            :strength
 
                a :EffectAttr;
 
                rdfs:label "strength" .
 

	
 
            :SimpleDimmer 
 
                a :DeviceClass; 
 
                rdfs:label "SimpleDimmer";
 
                :deviceAttr :brightness;
 
                :attr [ :outputAttr :level; :dmxOffset 0 ] .
 
                
 
            :light1  
 
                a :SimpleDimmer;
 
                :dmxUniverse dmxA:;
 
                :dmxBase 178 .
 

	
 
            effect:effect1 
 
                a :EffectClass;
 
                :setting effect:effect1_set1 .
 
            effect:effect1_set1 
 
                :device :light1; 
 
                :deviceAttr :brightness; 
 
                :scaledValue 0.5 .
 
            :fade1 
 
                a :Fader; 
 
                :effectClass effect:effect1; 
 
                :effectAttr :strength; 
 
                :value 0.6 .
 

	
 
        '''
 

	
 
class TestFaderEval:
 
    def test_faderValueScalesEffectSettings(self):
 
        g = MockSyncedGraph(NOTE_GRAPH)
 
        sender = mock.MagicMock()
 
        
 
        f = FaderEval(g, sender)
 
        devSettings = f._computeOutput()
 
        assert devSettings == DeviceSettings(g, [(L9['light1'], L9['brightness'], 0.3)])
 
\ No newline at end of file
light9/effect/sequencer/note.py
Show inline comments
 
import bisect
 
from dataclasses import dataclass
 
import logging
 
import time
 
from decimal import Decimal
 
from typing import Any, Dict, List, Optional, Tuple, Union, cast
 
from light9.effect.effecteval import EffectEval
 
from light9.effect.settings import BareEffectSettings, DeviceSettings, EffectSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import Literal, URIRef
 

	
 
from light9.namespaces import L9
 
from light9.newtypes import Curve, DeviceAttr, DeviceUri, NoteUri, typedValue
 
from light9.newtypes import Curve, DeviceAttr, EffectAttr, EffectClass, NoteUri, VTUnion, typedValue
 

	
 
log = logging.getLogger('sequencer')
 

	
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
    if isinstance(ret, Decimal):
 
        return float(ret)
 
    return ret
 

	
 

	
 
def prettyFormat(x: Union[float, str]):
 
    if isinstance(x, float):
 
        return round(x, 4)
 
    return x
 

	
 

	
 
@dataclass
 
class Note:
 
    """A Note outputs EffectAttr settings.
 
    
 
    Sample graph:
 
     :note1 a :Note; :curve :n1c1; :effectClass effect:allcolor;
 

	
 
    It can animate the EffectAttr settings over time, in two ways:
 
     * a `timed note` has an envelope curve that animates 
 
       the :strength EffectAttr over time
 
     * an `untimed note` has no curve, a fixed strength, but 
 
       still passes the wall clock time to its effect so the 
 
       effect can include animation. A `Fader` is an untimed note.
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs, timed=True):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.timed = timed
 
        self.effectEval = effectevalModule.EffectEval(graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
            settingValues = dict(g.predicate_objects(s))
 
            ea = cast(URIRef, settingValues[L9['effectAttr']])
 
            self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])
 
    This obj is immutable, I think, but the graph can change, 
 
    which can affect the output. However, I think this doesn't 
 
    do its own rebuilds, and it's up to the caller to addHandler 
 
    around the creation of Note objects.
 
    """
 
    graph: SyncedGraph
 
    uri: NoteUri
 
    # simpleOutputs: SimpleOutputs
 
    timed: bool = True
 

	
 
        if timed:
 
    def __post_init__(self):
 
        ec = self.graph.value(self.uri, L9['effectClass'])
 
        if ec is None:
 
            raise ValueError(f'note {self.uri} has no :effectClass')
 
        self.effectClass = EffectClass(ec)
 

	
 
            def floatVal(s, p):
 
                return typedValue(float, g, s, p)
 
        self.baseEffectSettings = self.getBaseEffectSettings()
 

	
 
            originTime = floatVal(uri, L9['originTime'])
 
        if self.timed:
 
            originTime = typedValue(float, self.graph, self.uri, L9['originTime'])
 
            self.points: List[Tuple[float, float]] = []
 
            for curve in g.objects(uri, L9['curve']):
 
            for curve in self.graph.objects(self.uri, L9['curve']):
 
                self.points.extend(self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
 
            self.points.sort()
 
        else:
 
            self.points = []
 

	
 
    def getBaseEffectSettings(self) -> BareEffectSettings:
 
        """i think these are settings that are fixed over time, 
 
        e.g. that you set in the note's body in the timeline editor
 
        """
 
        out: Dict[EffectAttr, VTUnion] = {}
 
        for s in self.graph.objects(self.uri, L9['setting']):
 
            settingValues = dict(self.graph.predicate_objects(s))
 
            ea = cast(EffectAttr, settingValues[L9['effectAttr']])
 
            out[ea] = pyType(settingValues[L9['value']])
 
        return BareEffectSettings(s=out)
 

	
 
    def getCurvePoints(self, curve: Curve, attr, originTime: float) -> List[Tuple[float, float]]:
 
        points = []
 
        po = list(self.graph.predicate_objects(curve))
 
        if dict(po).get(L9['attr'], None) != attr:
 
            return []
 
        for point in [row[1] for row in po if row[0] == L9['point']]:
 
@@ -78,39 +114,42 @@ class Note:
 

	
 
        p1, p2 = self.points[i], self.points[i + 1]
 
        frac = (t - p1[0]) / (p2[0] - p1[0])
 
        y = p1[1] + (p2[1] - p1[1]) * frac
 
        return y
 

	
 
    def outputSettings(self, t: float, strength: Optional[float] = None) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
        """
 
        list of (device, attr, value), and a report for web
 
        """
 
    def outputCurrent(self):      
 
        st = typedValue(float, self.graph, self.uri, L9['value'])
 
        return self._outputSettings(t=None, strength=st)
 

	
 
    def _outputSettings(
 
            self,
 
            t: float | None,
 
            strength: Optional[float] = None  #
 
    ) -> Tuple[BareEffectSettings, Dict]:
 
        
 
        if t is None:
 
            if self.timed:
 
                raise TypeError()
 
            t = time.time()  # so live effects will move
 
        report = {
 
        report:Dict[str,Any] = {
 
            'note': str(self.uri),
 
            'effectClass': self.effectEval.effect,
 
            'effectClass': str(self.effectClass),
 
        }
 

	
 
        strengthAttr = cast(DeviceAttr, L9['strength'])
 

	
 
        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict((DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
 
        effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength
 
        s = self.evalCurve(t) if strength is None else strength
 
        out = self.baseEffectSettings.withStrength(s)
 
        report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(out.s.items()))
 
        report['nonZero'] = s > 0
 

	
 
        def prettyFormat(x: Union[float, str]):
 
            if isinstance(x, float):
 
                return round(x, 4)
 
            return x
 
        return out, report
 

	
 
        report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
 
        report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
 
        # old api had this
 

	
 
        startTime = self.points[0][0] if self.timed else 0
 
        out, evalReport = self.effectEval.outputFromEffect(
 
            list(effectSettings.items()),
 
            effectSettings,
 
            songTime=t,
 
            # note: not using origin here since it's going away
 
            noteTime=t - startTime)
 
        report['devicesAffected'] = len(out.devices())
 
        return out, report
light9/effect/sequencer/note_test.py
Show inline comments
 
new file 100644
 
import pytest
 

	
 
from light9.effect.sequencer import Note
 
from light9.effect.settings import BareEffectSettings
 
from light9.mock_syncedgraph import MockSyncedGraph
 
from light9.namespaces import L9
 
from light9.newtypes import EffectAttr, NoteUri
 

	
 
PREFIXES = '''
 
@prefix : <http://light9.bigasterisk.com/> .
 
@prefix effect: <http://light9.bigasterisk.com/effect/> .
 
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
 
@prefix show: <http://light9.bigasterisk.com/show/dance2023/> .
 
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
 
@prefix dev: <http://light9.bigasterisk.com/theater/test/device/> .
 
@prefix dmxA: <http://light9.bigasterisk.com/output/dmxA/> .
 
'''
 

	
 
FADER_GRAPH = PREFIXES + '''
 
    :fade1 
 
        a :Fader; 
 
        :effectClass effect:effect1; 
 
        :effectAttr :strength; 
 
        :value 0.6 .
 
'''
 

	
 

	
 
class TestUntimedFaderNote:
 

	
 
    def test_returnsEffectSettings(self):
 
        g = MockSyncedGraph(FADER_GRAPH)
 
        n = Note(g, NoteUri(L9['fade1']), timed=False)
 
        out, report = n.outputCurrent()
 
        assert report['effectSettings'] == {'http://light9.bigasterisk.com/strength': 0.6}
 
        assert out == BareEffectSettings(s={EffectAttr(L9['strength']): 0.6})
 

	
 

	
 
NOTE_GRAPH = PREFIXES + '''
 
            :brightness         
 
                a :DeviceAttr; 
 
                rdfs:label "brightness"; 
 
                :dataType :scalar .
 

	
 
            :strength
 
                a :EffectAttr;
 
                rdfs:label "strength" .
 

	
 
            :SimpleDimmer 
 
                a :DeviceClass; 
 
                rdfs:label "SimpleDimmer";
 
                :deviceAttr :brightness;
 
                :attr [ :outputAttr :level; :dmxOffset 0 ] .
 
                
 
            dev:light1  
 
                a :SimpleDimmer;
 
                :dmxUniverse dmxA:;
 
                :dmxBase 178 .
 

	
 
            effect:effect1 
 
                a :EffectClass;
 
                :setting effect:effect1_set1 .
 
            effect:effect1_set1 
 
                :device dev:light1; 
 
                :deviceAttr :brightness; 
 
                :scaledValue 0.5 .
 
            :fade1 
 
                a :Fader; 
 
                :effectClass effect:effect1; 
 
                :effectAttr :strength; 
 
                :value 0.6 .
 

	
 
        '''
 

	
 

	
 
class TestTimedNote:
 

	
 
    @pytest.mark.skip()
 
    def test_scalesStrengthWithCurve(self):
 
        pass
light9/effect/sequencer/sequencer.py
Show inline comments
 
@@ -97,14 +97,13 @@ class Sequencer:
 
    @metrics('compile_song').time()
 
    def compileSong(self, song: Song) -> None:
 
        anyErrors = False
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            try:
 
                n = Note(self.graph, NoteUri(cast(NoteUri, note)), effecteval,
 
                         self.simpleOutputs)
 
                n = Note(self.graph, NoteUri(cast(NoteUri, note)))
 
            except Exception:
 
                log.warn(f"failed to build Note {note} - skipping")
 
                anyErrors = True
 
                continue
 
            self.notes[song].append(n)
 
        if not anyErrors:
light9/effect/settings.py
Show inline comments
 
"""
 
Data structure and convertors for a table of (device,attr,value)
 
rows. These might be effect attrs ('strength'), device attrs ('rx'),
 
or output attrs (dmx channel).
 

	
 
BareSettings means (attr,value), no device.
 
"""
 
from __future__ import annotations
 

	
 
import decimal
 
import logging
 
from dataclasses import dataclass
 
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple, Union, cast
 
from light9.localsyncedgraph import LocalSyncedGraph
 

	
 
import numpy
 
from rdflib import URIRef, Literal
 
from light9.namespaces import RDF, L9
 
import logging
 
from rdflib import Literal, URIRef
 

	
 
from light9.collector.device import resolve
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 

	
 
log = logging.getLogger('settings')
 
from light9.collector.device import resolve
 
from typing import Sequence, Dict, Union, List
 

	
 

	
 
def parseHex(h):
 
    if h[0] != '#': raise ValueError(h)
 
    if h[0] != '#':
 
        raise ValueError(h)
 
    return [int(h[i:i + 2], 16) for i in (1, 3, 5)]
 

	
 

	
 
def parseHexNorm(h):
 
    return [x / 255 for x in parseHex(h)]
 

	
 

	
 
def toHex(rgbFloat: Sequence[float]) -> str:
 
def toHex(rgbFloat: Sequence[float]) -> HexColor:
 
    assert len(rgbFloat) == 3
 
    scaled = (max(0, min(255, int(v * 255))) for v in rgbFloat)
 
    return '#%02x%02x%02x' % tuple(scaled)  # type: ignore
 
    return HexColor('#%02x%02x%02x' % tuple(scaled))
 

	
 

	
 
def getVal(graph, subj):
 
    lit = graph.value(subj, L9['value']) or graph.value(subj, L9['scaledValue'])
 
    ret = lit.toPython()
 
    if isinstance(ret, decimal.Decimal):
 
        ret = float(ret)
 
    return ret
 

	
 

	
 
GraphType = SyncedGraph | LocalSyncedGraph
 

	
 

	
 
class _Settings:
 
    """
 
    Generic for DeviceUri/DeviceAttr/VTUnion or EffectClass/EffectAttr/VTUnion
 

	
 
    default values are 0 or '#000000'. Internal rep must not store zeros or some
 
    comparisons will break.
 
    """
 
    EntityType = DeviceUri
 
    AttrType = DeviceAttr
 

	
 
    def __init__(self, graph, settingsList):
 
    def __init__(self, graph: GraphType, settingsList: List[Tuple[Any, Any, VTUnion]]):
 
        self.graph = graph  # for looking up all possible attrs
 
        self._compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {
 
        }  # dev: { attr: val }; val is number or colorhex
 
        self._compiled: Dict[self.__class__.EntityType, Dict[self.__class__.AttrType, VTUnion]] = {}
 
        for row in settingsList:
 
            self._compiled.setdefault(row[0], {})[row[1]] = row[2]
 
        # self._compiled may not be final yet- see _fromCompiled
 
        self._delZeros()
 

	
 
    @classmethod
 
    def _fromCompiled(cls, graph, compiled):
 
    def _fromCompiled(cls, graph: GraphType, compiled: Dict[EntityType, Dict[AttrType, VTUnion]]):
 
        obj = cls(graph, [])
 
        obj._compiled = compiled
 
        obj._delZeros()
 
        return obj
 

	
 
    @classmethod
 
    def fromResource(cls, graph, subj):
 
        settingsList = []
 
        with graph.currentState() as g:
 
            for s in g.objects(subj, L9['setting']):
 
                d = g.value(s, L9['device'])
 
                da = g.value(s, L9['deviceAttr'])
 
                v = getVal(g, s)
 
                settingsList.append((d, da, v))
 
        return cls(graph, settingsList)
 

	
 
    @classmethod
 
    def fromVector(cls, graph, vector, deviceAttrFilter=None):
 
        compiled: Dict[URIRef, Dict[URIRef, Union[float, str]]] = {}
 
        i = 0
 
        for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
 
            if a == L9['color']:
 
                v = toHex(vector[i:i + 3])
 
                i += 3
 
            else:
 
                v = vector[i]
 
                i += 1
 
            compiled.setdefault(d, {})[a] = v
 
        return cls._fromCompiled(graph, compiled)
 

	
 
    @classmethod
 
    def fromList(cls, graph, others):
 
    def fromList(cls, graph: GraphType, others: List[_Settings]):
 
        """note that others may have multiple values for an attr"""
 
        out = cls(graph, [])
 
        self = cls(graph, [])
 
        for s in others:
 
            if not isinstance(s, cls):
 
                raise TypeError(s)
 
            # if not isinstance(s, cls):
 
            #     raise TypeError(s)
 
            for row in s.asList():  # could work straight from s._compiled
 
                if row[0] is None:
 
                    raise TypeError('bad row %r' % (row,))
 
                dev, devAttr, value = row
 
                devDict = out._compiled.setdefault(dev, {})
 
                devDict = self._compiled.setdefault(dev, {})
 
                if devAttr in devDict:
 
                    value = resolve(dev, devAttr, [devDict[devAttr], value])
 
                    existingVal: VTUnion = devDict[devAttr]
 
                    raise NotImplementedError('fixme: dev is to be a deviceclass (but it is currently unused)')
 
                    value = resolve(dev, devAttr, [existingVal, value])
 
                devDict[devAttr] = value
 
        out._delZeros()
 
        return out
 
        self._delZeros()
 
        return self
 

	
 
    @classmethod
 
    def fromBlend(cls, graph, others):
 
    def _mult(cls, weight, row, dd) -> VTUnion:
 
        if isinstance(row[2], str):
 
            prev = parseHexNorm(dd.get(row[1], '#000000'))
 
            return toHex(prev + weight * numpy.array(parseHexNorm(row[2])))
 
        else:
 
            return dd.get(row[1], 0) + weight * row[2]
 

	
 
    @classmethod
 
    def fromBlend(cls, graph: GraphType, others: List[Tuple[float, _Settings]]):
 
        """others is a list of (weight, Settings) pairs"""
 
        out = cls(graph, [])
 
        for weight, s in others:
 
            if not isinstance(s, cls):
 
                raise TypeError(s)
 
            for row in s.asList():  # could work straight from s._compiled
 
                if row[0] is None:
 
                    raise TypeError('bad row %r' % (row,))
 
                dd = out._compiled.setdefault(row[0], {})
 

	
 
                if isinstance(row[2], str):
 
                    prev = parseHexNorm(dd.get(row[1], '#000000'))
 
                    newVal = toHex(prev +
 
                                   weight * numpy.array(parseHexNorm(row[2])))
 
                else:
 
                    newVal = dd.get(row[1], 0) + weight * row[2]
 
                newVal = cls._mult(weight, row, dd)
 
                dd[row[1]] = newVal
 
        out._delZeros()
 
        return out
 

	
 
    def _zeroForAttr(self, attr):
 
    def _zeroForAttr(self, attr: AttrType) -> VTUnion:
 
        if attr == L9['color']:
 
            return '#000000'
 
            return HexColor('#000000')
 
        return 0.0
 

	
 
    def _delZeros(self):
 
        for dev, av in list(self._compiled.items()):
 
            for attr, val in list(av.items()):
 
                if val == self._zeroForAttr(attr):
 
                    del av[attr]
 
            if not av:
 
                del self._compiled[dev]
 

	
 
    def __hash__(self):
 
        itemed = tuple([(d, tuple([(a, v)
 
                                   for a, v in sorted(av.items())]))
 
                        for d, av in sorted(self._compiled.items())])
 
        itemed = tuple([(d, tuple([(a, v) for a, v in sorted(av.items())])) for d, av in sorted(self._compiled.items())])
 
        return hash(itemed)
 

	
 
    def __eq__(self, other):
 
        if not issubclass(other.__class__, self.__class__):
 
            raise TypeError("can't compare %r to %r" %
 
                            (self.__class__, other.__class__))
 
            raise TypeError("can't compare %r to %r" % (self.__class__, other.__class__))
 
        return self._compiled == other._compiled
 

	
 
    def __ne__(self, other):
 
        return not self == other
 

	
 
    def __bool__(self):
 
@@ -157,65 +152,66 @@ class _Settings:
 
    def __repr__(self):
 
        words = []
 

	
 
        def accum():
 
            for dev, av in self._compiled.items():
 
                for attr, val in sorted(av.items()):
 
                    words.append(
 
                        '%s.%s=%s' %
 
                        (dev.rsplit('/')[-1], attr.rsplit('/')[-1], val))
 
                    words.append('%s.%s=%s' % (dev.rsplit('/')[-1], attr.rsplit('/')[-1], val))
 
                    if len(words) > 5:
 
                        words.append('...')
 
                        return
 

	
 
        accum()
 
        if not words:
 
            words = ['(no settings)']
 
        return '<%s %s>' % (self.__class__.__name__, ' '.join(words))
 

	
 
    def getValue(self, dev, attr):
 
    def getValue(self, dev: EntityType, attr: AttrType):
 
        return self._compiled.get(dev, {}).get(attr, self._zeroForAttr(attr))
 

	
 
    def _vectorKeys(self, deviceAttrFilter=None):
 
        """stable order of all the dev,attr pairs for this type of settings"""
 
        raise NotImplementedError
 

	
 
    def asList(self):
 
    def asList(self) -> List[Tuple[EntityType, AttrType, VTUnion]]:
 
        """old style list of (dev, attr, val) tuples"""
 
        out = []
 
        for dev, av in self._compiled.items():
 
            for attr, val in av.items():
 
                out.append((dev, attr, val))
 
        return out
 

	
 
    def devices(self):
 
    def devices(self) -> List[EntityType]:
 
        return list(self._compiled.keys())
 

	
 
    def toVector(self, deviceAttrFilter=None):
 
    def toVector(self, deviceAttrFilter=None) -> List[float]:
 
        out: List[float] = []
 
        for dev, attr in self._vectorKeys(deviceAttrFilter):
 
            v = self.getValue(dev, attr)
 
            if attr == L9['color']:
 
                out.extend(parseHexNorm(v))
 
            else:
 
                if not isinstance(v, float):
 
                    raise TypeError(f'{attr=} value was {v=}')
 
                out.append(v)
 
        return out
 

	
 
    def byDevice(self):
 
    def byDevice(self) -> Iterable[Tuple[EntityType, _Settings]]:
 
        for dev, av in self._compiled.items():
 
            yield dev, self.__class__._fromCompiled(self.graph, {dev: av})
 

	
 
    def ofDevice(self, dev):
 
        return self.__class__._fromCompiled(self.graph,
 
                                            {dev: self._compiled.get(dev, {})})
 
    def ofDevice(self, dev: EntityType) -> _Settings:
 
        return self.__class__._fromCompiled(self.graph, {dev: self._compiled.get(dev, {})})
 

	
 
    def distanceTo(self, other):
 
        diff = numpy.array(self.toVector()) - other.toVector()
 
        d = numpy.linalg.norm(diff, ord=None)
 
        log.info('distanceTo %r - %r = %g', self, other, d)
 
        return d
 

	
 
    def statements(self, subj, ctx, settingRoot, settingsSubgraphCache):
 
    def statements(self, subj: EntityType, ctx: URIRef, settingRoot: URIRef, settingsSubgraphCache: Set):
 
        """
 
        settingRoot can be shared across images (or even wider if you want)
 
        """
 
        # ported from live.coffee
 
        add = []
 
        for i, (dev, attr, val) in enumerate(self.asList()):
 
@@ -224,14 +220,13 @@ class _Settings:
 
            setting = URIRef('%sset%s' % (settingRoot, settingHash))
 
            add.append((subj, L9['setting'], setting, ctx))
 
            if setting in settingsSubgraphCache:
 
                continue
 

	
 
            scaledAttributeTypes = [L9['color'], L9['brightness'], L9['uv']]
 
            settingType = L9[
 
                'scaledValue'] if attr in scaledAttributeTypes else L9['value']
 
            settingType = L9['scaledValue'] if attr in scaledAttributeTypes else L9['value']
 
            if not isinstance(val, URIRef):
 
                val = Literal(val)
 
            add.extend([
 
                (setting, L9['device'], dev, ctx),
 
                (setting, L9['deviceAttr'], attr, ctx),
 
                (setting, settingType, val, ctx),
 
@@ -239,12 +234,14 @@ class _Settings:
 
            settingsSubgraphCache.add(setting)
 

	
 
        return add
 

	
 

	
 
class DeviceSettings(_Settings):
 
    EntityType = DeviceUri
 
    AttrType = DeviceAttr
 

	
 
    def _vectorKeys(self, deviceAttrFilter=None):
 
        with self.graph.currentState() as g:
 
            devs = set()  # devclass, dev
 
            for dc in g.subjects(RDF.type, L9['DeviceClass']):
 
                for dev in g.subjects(RDF.type, dc):
 
@@ -255,6 +252,50 @@ class DeviceSettings(_Settings):
 
                for attr in sorted(g.objects(dc, L9['deviceAttr'])):
 
                    key = (dev, attr)
 
                    if deviceAttrFilter and key not in deviceAttrFilter:
 
                        continue
 
                    keys.append(key)
 
        return keys
 

	
 
    @classmethod
 
    def fromResource(cls, graph: GraphType, subj: EntityType):
 
        settingsList = []
 
        with graph.currentState() as g:
 
            for s in g.objects(subj, L9['setting']):
 
                d = g.value(s, L9['device'])
 
                da = g.value(s, L9['deviceAttr'])
 
                v = getVal(g, s)
 
                settingsList.append((d, da, v))
 
        return cls(graph, settingsList)
 

	
 
    @classmethod
 
    def fromVector(cls, graph, vector, deviceAttrFilter=None):
 
        compiled: Dict[DeviceSettings.EntityType, Dict[DeviceSettings.AttrType, VTUnion]] = {}
 
        i = 0
 
        for (d, a) in cls(graph, [])._vectorKeys(deviceAttrFilter):
 
            if a == L9['color']:
 
                v = toHex(vector[i:i + 3])
 
                i += 3
 
            else:
 
                v = vector[i]
 
                i += 1
 
            compiled.setdefault(d, {})[a] = v
 
        return cls._fromCompiled(graph, compiled)
 

	
 
    @classmethod
 
    def merge(cls, graph: SyncedGraph, others: List[DeviceSettings]) -> DeviceSettings:
 
        return cls.fromList(graph, cast(List[_Settings], others))
 

	
 

	
 
@dataclass
 
class BareEffectSettings:
 
    # settings not specific to any effect
 
    s: Dict[EffectAttr, VTUnion]
 

	
 
    def withStrength(self, strength: float) -> BareEffectSettings:
 
        out = self.s.copy()
 
        out[EffectAttr(L9['strength'])] = strength
 
        return BareEffectSettings(s=out)
 

	
 

	
 
class EffectSettings(_Settings):
 
    pass
light9/effect/settings_test.py
Show inline comments
 
from typing import cast
 
import unittest
 
from light9.newtypes import DeviceAttr, DeviceUri, HexColor, VTUnion
 
from rdflib import Literal
 
from rdfdb.patch import Patch
 
from light9.localsyncedgraph import LocalSyncedGraph
 
from light9.namespaces import L9, DEV
 
from light9.effect.settings import DeviceSettings
 

	
 
@@ -49,24 +51,24 @@ class TestDeviceSettings(unittest.TestCa
 
                (L9['foo_set0'], L9['value'], Literal(0.1), ctx),
 
                (L9['foo'], L9['setting'], L9['foo_set1'], ctx),
 
                (L9['foo_set1'], L9['device'], L9['light1'], ctx),
 
                (L9['foo_set1'], L9['deviceAttr'], L9['speed'], ctx),
 
                (L9['foo_set1'], L9['scaledValue'], Literal(0.2), ctx),
 
            ]))
 
        s = DeviceSettings.fromResource(self.graph, L9['foo'])
 
        s = DeviceSettings.fromResource(self.graph, DeviceUri(L9['foo']))
 

	
 
        self.assertEqual(
 
            DeviceSettings(self.graph, [
 
                (L9['light1'], L9['brightness'], 0.1),
 
                (L9['light1'], L9['speed'], 0.2),
 
            ]), s)
 

	
 
    def testToVector(self):
 
        v = DeviceSettings(self.graph, [
 
            (DEV['aura1'], L9['rx'], 0.5),
 
            (DEV['aura1'], L9['color'], '#00ff00'),
 
            (DeviceUri(DEV['aura1']), DeviceAttr(L9['rx']), 0.5),
 
            (DeviceUri(DEV['aura1']), DeviceAttr(L9['color']), HexColor('#00ff00')),
 
        ]).toVector()
 
        self.assertEqual([
 
            0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
 
            0, 0, 0, 0, 0, 0, 0, 0
 
        ], v)
 

	
 
@@ -75,20 +77,20 @@ class TestDeviceSettings(unittest.TestCa
 
            0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
 
            0, 0, 0, 0, 0, 0, 0, 0
 
        ])
 

	
 
        self.assertEqual(
 
            DeviceSettings(self.graph, [
 
                (DEV['aura1'], L9['rx'], 0.5),
 
                (DEV['aura1'], L9['color'], '#00ff00'),
 
                (DeviceUri(DEV['aura1']), DeviceAttr(L9['rx']), 0.5),
 
                (DeviceUri(DEV['aura1']), DeviceAttr(L9['color']), HexColor('#00ff00')),
 
            ]), s)
 

	
 
    def testAsList(self):
 
        sets = [
 
            (L9['light1'], L9['attr2'], 0.3),
 
            (L9['light1'], L9['attr1'], 0.5),
 
            (DeviceUri(L9['light1']), DeviceAttr(L9['attr2']), cast(VTUnion,0.3)),
 
            (DeviceUri(L9['light1']), DeviceAttr(L9['attr1']), 0.5),
 
        ]
 
        self.assertCountEqual(sets, DeviceSettings(self.graph, sets).asList())
 

	
 
    def testDevices(self):
 
        s = DeviceSettings(self.graph, [
 
            (DEV['aura1'], L9['rx'], 0),
 
@@ -98,13 +100,13 @@ class TestDeviceSettings(unittest.TestCa
 
        self.assertCountEqual([DEV['aura2']], s.devices())
 

	
 
    def testAddStatements(self):
 
        s = DeviceSettings(self.graph, [
 
            (DEV['aura2'], L9['rx'], 0.1),
 
        ])
 
        stmts = s.statements(L9['foo'], L9['ctx1'], L9['s_'], set())
 
        stmts = s.statements(DeviceUri(L9['foo']), L9['ctx1'], L9['s_'], set())
 
        self.maxDiff = None
 
        setting = sorted(stmts)[-1][0]
 
        self.assertCountEqual([
 
            (L9['foo'], L9['setting'], setting, L9['ctx1']),
 
            (setting, L9['device'], DEV['aura2'], L9['ctx1']),
 
            (setting, L9['deviceAttr'], L9['rx'], L9['ctx1']),
 
@@ -154,11 +156,11 @@ class TestFromBlend(unittest.TestCase):
 
                (.2, DeviceSettings(self.graph, [(L1, ZOOM, 0.5)])),
 
                (.3, DeviceSettings(self.graph, [(L1, ZOOM, 1.0)])),
 
            ]))
 

	
 
    def testMixColors(self):
 
        self.assertEqual(
 
            DeviceSettings(self.graph, [(L1, ZOOM, '#503000')]),
 
            DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#503000'))]),
 
            DeviceSettings.fromBlend(self.graph, [
 
                (.25, DeviceSettings(self.graph, [(L1, ZOOM, '#800000')])),
 
                (.5, DeviceSettings(self.graph, [(L1, ZOOM, '#606000')])),
 
                (.25, DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#800000'))])),
 
                (.5, DeviceSettings(self.graph, [(L1, ZOOM, HexColor('#606000'))])),
 
            ]))
light9/effect/simple_outputs.py
Show inline comments
 
@@ -18,12 +18,15 @@ class SimpleOutputs:
 
        self.effectOutputs: Dict[URIRef, List[Tuple[URIRef, URIRef, Any, bool]]] = {}
 

	
 
        self.graph.addHandler(self.updateEffectsFromGraph)
 

	
 
    def updateEffectsFromGraph(self):
 
        for effect in self.graph.subjects(RDF.type, L9['Effect']):
 
            raise TypeError('change graph from Effect to EffectClass')
 

	
 
        for effect in self.graph.subjects(RDF.type, L9['EffectClass']):
 
            settings = []
 
            for setting in self.graph.objects(effect, L9['setting']):
 
                settingValues = dict(self.graph.predicate_objects(setting))
 
                try:
 
                    d = settingValues.get(L9['device'], None)
 
                    a = settingValues.get(L9['deviceAttr'], None)
light9/newtypes.py
Show inline comments
 
import decimal
 
from typing import Tuple, NewType, Type, TypeVar, Union, cast
 
from rdflib import URIRef
 
from rdflib.term import Node
 

	
 
ClientType = NewType('ClientType', str)
 
ClientSessionType = NewType('ClientSessionType', str)
 
@@ -8,12 +9,14 @@ Curve = NewType('Curve', URIRef)
 
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
 
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
 
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
 
DmxIndex = NewType('DmxIndex', int)  # 1..512
 
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
 
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
 
EffectClass = NewType('EffectClass', URIRef) # e.g. effect:chase
 
EffectAttr = NewType('EffectAttr', URIRef)  # e.g. :chaseSpeed
 
NoteUri = NewType('NoteUri', URIRef)
 
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
 
OutputValue = NewType('OutputValue', int)  # byte in dmx message
 
Song = NewType('Song', URIRef)
 
UnixTime = NewType('UnixTime', float)
 

	
 
@@ -42,8 +45,9 @@ def typedValue(objType: Type[_ObjType], 
 
    """graph.value(subj, pred) with a given return type. 
 
    If objType is not an rdflib.Node, we toPython() the value."""
 
    obj = graph.value(subj, pred)
 
    if obj is None:
 
        raise ValueError()
 
    conv = obj if _isSubclass2(objType, Node) else obj.toPython()
 
    # may need to turn Decimal to float here
 
    if objType is float and isinstance(conv, decimal.Decimal):
 
        conv = float(conv)
 
    return cast(objType, conv)
 
\ No newline at end of file
0 comments (0 inline, 0 general)