Files @ 04612ba3fe45
Branch filter:

Location: light9/light9/effect/sequencer/note.py

drewp@bigasterisk.com
refactor
import bisect
import logging
import time
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef

from light9.namespaces import L9
from light9.newtypes import Curve, DeviceAttr, DeviceUri, NoteUri, typedValue

log = logging.getLogger('sequencer')


def pyType(n):
    ret = n.toPython()
    if isinstance(ret, Decimal):
        return float(ret)
    return ret


class Note(object):

    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs, timed=True):
        g = self.graph = graph
        self.uri = uri
        self.timed = timed
        self.effectEval = effectevalModule.EffectEval(graph, g.value(uri, L9['effectClass']), simpleOutputs)
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
        for s in g.objects(uri, L9['setting']):
            settingValues = dict(g.predicate_objects(s))
            ea = cast(URIRef, settingValues[L9['effectAttr']])
            self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])

        if timed:

            def floatVal(s, p):
                return typedValue(float, g, s, p)

            originTime = floatVal(uri, L9['originTime'])
            self.points: List[Tuple[float, float]] = []
            for curve in g.objects(uri, L9['curve']):
                self.points.extend(self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
            self.points.sort()
        else:
            self.points = []

    def getCurvePoints(self, curve: Curve, attr, originTime: float) -> List[Tuple[float, float]]:
        points = []
        po = list(self.graph.predicate_objects(curve))
        if dict(po).get(L9['attr'], None) != attr:
            return []
        for point in [row[1] for row in po if row[0] == L9['point']]:
            po2 = dict(self.graph.predicate_objects(point))
            t = cast(Literal, po2[L9['time']]).toPython()
            if not isinstance(t, float):
                raise TypeError
            
            v = cast(Literal, po2[L9['value']]).toPython()
            if not isinstance(v, float):
                raise TypeError
            points.append((originTime + t, v))
        return points

    def activeAt(self, t: float) -> bool:
        return self.points[0][0] <= t <= self.points[-1][0]

    def evalCurve(self, t: float) -> float:
        i = bisect.bisect_left(self.points, (t, None)) - 1

        if i == -1:
            return self.points[0][1]
        if self.points[i][0] > t:
            return self.points[i][1]
        if i >= len(self.points) - 1:
            return self.points[i][1]

        p1, p2 = self.points[i], self.points[i + 1]
        frac = (t - p1[0]) / (p2[0] - p1[0])
        y = p1[1] + (p2[1] - p1[1]) * frac
        return y

    def outputSettings(self, t: float, strength: Optional[float] = None) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
        """
        list of (device, attr, value), and a report for web
        """
        if t is None:
            if self.timed:
                raise TypeError()
            t = time.time()  # so live effects will move
        report = {
            'note': str(self.uri),
            'effectClass': self.effectEval.effect,
        }

        strengthAttr = cast(DeviceAttr, L9['strength'])

        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict((DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
        effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength

        def prettyFormat(x: Union[float, str]):
            if isinstance(x, float):
                return round(x, 4)
            return x

        report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
        report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
        startTime = self.points[0][0] if self.timed else 0
        out, evalReport = self.effectEval.outputFromEffect(
            list(effectSettings.items()),
            songTime=t,
            # note: not using origin here since it's going away
            noteTime=t - startTime)
        report['devicesAffected'] = len(out.devices())
        return out, report