Files
@ 04612ba3fe45
Branch filter:
Location: light9/light9/effect/sequencer/note.py
04612ba3fe45
4.2 KiB
text/x-python
refactor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | import bisect
import logging
import time
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef
from light9.namespaces import L9
from light9.newtypes import Curve, DeviceAttr, DeviceUri, NoteUri, typedValue
log = logging.getLogger('sequencer')
def pyType(n):
ret = n.toPython()
if isinstance(ret, Decimal):
return float(ret)
return ret
class Note(object):
def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs, timed=True):
g = self.graph = graph
self.uri = uri
self.timed = timed
self.effectEval = effectevalModule.EffectEval(graph, g.value(uri, L9['effectClass']), simpleOutputs)
self.baseEffectSettings: Dict[URIRef, Any] = {} # {effectAttr: value}
for s in g.objects(uri, L9['setting']):
settingValues = dict(g.predicate_objects(s))
ea = cast(URIRef, settingValues[L9['effectAttr']])
self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])
if timed:
def floatVal(s, p):
return typedValue(float, g, s, p)
originTime = floatVal(uri, L9['originTime'])
self.points: List[Tuple[float, float]] = []
for curve in g.objects(uri, L9['curve']):
self.points.extend(self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
self.points.sort()
else:
self.points = []
def getCurvePoints(self, curve: Curve, attr, originTime: float) -> List[Tuple[float, float]]:
points = []
po = list(self.graph.predicate_objects(curve))
if dict(po).get(L9['attr'], None) != attr:
return []
for point in [row[1] for row in po if row[0] == L9['point']]:
po2 = dict(self.graph.predicate_objects(point))
t = cast(Literal, po2[L9['time']]).toPython()
if not isinstance(t, float):
raise TypeError
v = cast(Literal, po2[L9['value']]).toPython()
if not isinstance(v, float):
raise TypeError
points.append((originTime + t, v))
return points
def activeAt(self, t: float) -> bool:
return self.points[0][0] <= t <= self.points[-1][0]
def evalCurve(self, t: float) -> float:
i = bisect.bisect_left(self.points, (t, None)) - 1
if i == -1:
return self.points[0][1]
if self.points[i][0] > t:
return self.points[i][1]
if i >= len(self.points) - 1:
return self.points[i][1]
p1, p2 = self.points[i], self.points[i + 1]
frac = (t - p1[0]) / (p2[0] - p1[0])
y = p1[1] + (p2[1] - p1[1]) * frac
return y
def outputSettings(self, t: float, strength: Optional[float] = None) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
"""
list of (device, attr, value), and a report for web
"""
if t is None:
if self.timed:
raise TypeError()
t = time.time() # so live effects will move
report = {
'note': str(self.uri),
'effectClass': self.effectEval.effect,
}
strengthAttr = cast(DeviceAttr, L9['strength'])
effectSettings: Dict[DeviceAttr, Union[float, str]] = dict((DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength
def prettyFormat(x: Union[float, str]):
if isinstance(x, float):
return round(x, 4)
return x
report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
startTime = self.points[0][0] if self.timed else 0
out, evalReport = self.effectEval.outputFromEffect(
list(effectSettings.items()),
songTime=t,
# note: not using origin here since it's going away
noteTime=t - startTime)
report['devicesAffected'] = len(out.devices())
return out, report
|