Changeset - 04612ba3fe45
[Not reviewed]
0 6 2 - 20 months ago 2023-05-20 03:55:28
8 files changed with 153 insertions and 128 deletions:
0 comments (0 inline, 0 general)
Show inline comments
from .note import Note
Show inline comments

import asyncio
import logging
import time
from typing import Callable, Coroutine, List, cast
from light9.effect.sequencer.sequencer import Note

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9.effect import effecteval
from light9.effect.sequencer import Note
from light9.effect.settings import DeviceSettings
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
from light9.newtypes import NoteUri

log = logging.getLogger('sequencer')

class FaderEval:
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
    The current faders become Notes in here, for more code reuse.
    def __init__(self,
                 graph: SyncedGraph,
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
        self.graph = graph
        self.sendToCollector = sendToCollector

        # Notes without times- always on
        self.notes: List[Note] = []

        self.simpleOutputs = SimpleOutputs(self.graph)
        self.lastLoopSucceeded = False

        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
'startupdating task')

    async def startUpdating(self):
Show inline comments
new file 100644
import bisect
import logging
import time
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef

from light9.namespaces import L9
from light9.newtypes import Curve, DeviceAttr, DeviceUri, NoteUri, typedValue

log = logging.getLogger('sequencer')


def pyType(n):
    ret = n.toPython()
    if isinstance(ret, Decimal):
        return float(ret)
    return ret


class Note(object):

    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs, timed=True):
        g = self.graph = graph
        self.uri = uri
        self.timed = timed
        self.effectEval = effectevalModule.EffectEval(graph, g.value(uri, L9['effectClass']), simpleOutputs)
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
        for s in g.objects(uri, L9['setting']):
            settingValues = dict(g.predicate_objects(s))
            ea = cast(URIRef, settingValues[L9['effectAttr']])
            self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])

        if timed:

            def floatVal(s, p):
                return typedValue(float, g, s, p)

            originTime = floatVal(uri, L9['originTime'])
            self.points: List[Tuple[float, float]] = []
            for curve in g.objects(uri, L9['curve']):
                self.points.extend(self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
            self.points = []

    def getCurvePoints(self, curve: Curve, attr, originTime: float) -> List[Tuple[float, float]]:
        points = []
        po = list(self.graph.predicate_objects(curve))
        if dict(po).get(L9['attr'], None) != attr:
            return []
        for point in [row[1] for row in po if row[0] == L9['point']]:
            po2 = dict(self.graph.predicate_objects(point))
            t = cast(Literal, po2[L9['time']]).toPython()
            if not isinstance(t, float):
                raise TypeError
            v = cast(Literal, po2[L9['value']]).toPython()
            if not isinstance(v, float):
                raise TypeError
            points.append((originTime + t, v))
        return points

    def activeAt(self, t: float) -> bool:
        return self.points[0][0] <= t <= self.points[-1][0]

    def evalCurve(self, t: float) -> float:
        i = bisect.bisect_left(self.points, (t, None)) - 1

        if i == -1:
            return self.points[0][1]
        if self.points[i][0] > t:
            return self.points[i][1]
        if i >= len(self.points) - 1:
            return self.points[i][1]

        p1, p2 = self.points[i], self.points[i + 1]
        frac = (t - p1[0]) / (p2[0] - p1[0])
        y = p1[1] + (p2[1] - p1[1]) * frac
        return y

    def outputSettings(self, t: float, strength: Optional[float] = None) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
        list of (device, attr, value), and a report for web
        if t is None:
            if self.timed:
                raise TypeError()
            t = time.time()  # so live effects will move
        report = {
            'note': str(self.uri),
            'effectClass': self.effectEval.effect,

        strengthAttr = cast(DeviceAttr, L9['strength'])

        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict((DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
        effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength

        def prettyFormat(x: Union[float, str]):
            if isinstance(x, float):
                return round(x, 4)
            return x

        report['effectSettings'] = dict((str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
        report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
        startTime = self.points[0][0] if self.timed else 0
        out, evalReport = self.effectEval.outputFromEffect(
            # note: not using origin here since it's going away
            noteTime=t - startTime)
        report['devicesAffected'] = len(out.devices())
        return out, report
Show inline comments
copies from, which this should replace

import asyncio
from louie import dispatcher,All
import imp
import logging
import time
import traceback
from typing import Callable, Coroutine, Dict, List, cast

from louie import All, dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.inotify import INotify
from twisted.python.filepath import FilePath
import logging, bisect, time
import traceback
from decimal import Decimal
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, cast, Union

from light9.ascoltami.musictime_client import MusicTime
from light9.effect import effecteval
from light9.effect.sequencer import Note
from light9.effect.settings import DeviceSettings
from light9.effect.simple_outputs import SimpleOutputs
from light9.metrics import metrics
from light9.namespaces import L9, RDF
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from light9.metrics import metrics

import imp
from light9.newtypes import NoteUri, Song

log = logging.getLogger('sequencer')


class StateUpdate(All):

def pyType(n):
    ret = n.toPython()
    if isinstance(ret, Decimal):
        return float(ret)
    return ret


class Note(object):

    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule,
                 simpleOutputs, timed=True):
        g = self.graph = graph
        self.uri = uri
        self.timed= timed
        self.effectEval = effectevalModule.EffectEval(
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
        for s in g.objects(uri, L9['setting']):
            settingValues = dict(g.predicate_objects(s))
            ea = settingValues[L9['effectAttr']]
            self.baseEffectSettings[ea] = pyType(settingValues[L9['value']])


        if timed:
            def floatVal(s, p):
                return float(g.value(s, p).toPython())

            originTime = floatVal(uri, L9['originTime'])
            self.points: List[Tuple[float, float]] = []
            for curve in g.objects(uri, L9['curve']):
                    self.getCurvePoints(cast(Curve, curve), L9['strength'], originTime))
            self.points = []

    def getCurvePoints(self, curve: Curve, attr,
                       originTime: float) -> List[Tuple[float, float]]:
        points = []
        po = list(self.graph.predicate_objects(curve))
        if dict(po).get(L9['attr'], None) != attr:
            return []
        for point in [row[1] for row in po if row[0] == L9['point']]:
            po2 = dict(self.graph.predicate_objects(point))
                (originTime + float(po2[L9['time']]), float(po2[L9['value']])))
        return points

    def activeAt(self, t: float) -> bool:
        return self.points[0][0] <= t <= self.points[-1][0]

    def evalCurve(self, t: float) -> float:
        i = bisect.bisect_left(self.points, (t, None)) - 1

        if i == -1:
            return self.points[0][1]
        if self.points[i][0] > t:
            return self.points[i][1]
        if i >= len(self.points) - 1:
            return self.points[i][1]

        p1, p2 = self.points[i], self.points[i + 1]
        frac = (t - p1[0]) / (p2[0] - p1[0])
        y = p1[1] + (p2[1] - p1[1]) * frac
        return y

    def outputSettings(
            t: float, strength: Optional[float] = None
            ) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
        list of (device, attr, value), and a report for web
        if t is None:
            if self.timed:
                raise TypeError()
            t = time.time() # so live effects will move
        report = {
            'note': str(self.uri),
            'effectClass': self.effectEval.effect,

        strengthAttr=cast(DeviceAttr, L9['strength'])

        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict(
            (DeviceAttr(da), v) for da, v in self.baseEffectSettings.items())
        effectSettings[strengthAttr] = self.evalCurve(t) if strength is None else strength

        def prettyFormat(x: Union[float, str]):
            if isinstance(x, float):
                return round(x, 4)
            return x

        report['effectSettings'] = dict(
            (str(k), prettyFormat(v))
            for k, v in sorted(effectSettings.items()))
        report['nonZero'] = cast(float, effectSettings[strengthAttr]) > 0
        startTime = self.points[0][0] if self.timed else 0
        out, evalReport = self.effectEval.outputFromEffect(
            # note: not using origin here since it's going away
            noteTime=t - startTime)
        report['devicesAffected'] = len(out.devices())
        return out, report


class CodeWatcher(object):

    def __init__(self, onChange):
        self.onChange = onChange

        self.notifier = INotify()

    def codeChange(self, watch, path, mask):

        def go():
  "reload effecteval")

        # in case we got an event at the start of the write
        reactor.callLater(.1, go) # type: ignore


class Sequencer(object):
@@ -237,44 +130,44 @@ class Sequencer(object):
                    self.lastLoopSucceeded = True

                delay = max(0, 1 / self.fps - took)
                await asyncio.sleep(delay)

    async def update(self):
        with metrics('update_s0_getMusic').time():
            musicState = {'t':123.0,'song':''}
            if not musicState.get('song') or not isinstance(
                    musicState.get('t'), float):
            song = Song(URIRef(musicState['song']))
            # print('dispsend')
            # import pdb;pdb.set_trace()
                                'song': str(song),
                                't': musicState['t']

        with metrics('update_s1_eval').time():
            settings = []
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
            songNotes = sorted(cast(List[Note], self.notes.get(song, [])), key=lambda n: n.uri)
            noteReports = []
            for note in songNotes:
                    s, report = note.outputSettings(musicState['t'])
                except Exception:
            devSettings = DeviceSettings.fromList(self.graph, settings)
        dispatcher.send(StateUpdate, update={'songNotes': noteReports})

        with metrics('update_s3_send').time():  # our measurement
            sendSecs = await self.sendToCollector(devSettings)

        # sendToCollector's own measurement.
        # (sometimes it's None, not sure why, and neither is mypy)
        #if isinstance(sendSecs, float):
        #    metrics('update_s3_send_client').observe(sendSecs)
Show inline comments
plays back effect notes from the timeline (and an untimed note from the faders)

import asyncio
import json
import logging
import time

from louie import dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from light9 import networking
from light9.collector.collector_client_asyncio import sendToCollector
from light9.effect.sequencer.eval_faders import FaderEval
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
from light9.effect.settings import DeviceSettings
from light9.metrics import metrics
from light9.run_local import log
from louie import dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.types import Receive, Scope, Send
from starlette_exporter import PrometheusMiddleware, handle_metrics


async def changes():
    state = {}
    q = asyncio.Queue()

    def onBroadcast(update):

    dispatcher.connect(onBroadcast, StateUpdate)

    lastSend = 0
    while True:
        await q.get()
        now = time.time()
        if now > lastSend + .2:
            lastSend = now
            yield json.dumps(state)


async def send_page_updates(request):
    return EventSourceResponse(changes())

Show inline comments
new file 100644

import asyncio
from light9.run_local import log


def test_import():

    async def go():
        # this sets up some watcher tasks
        from light9.effect.sequencer.service import app
 , debug=True)
\ No newline at end of file
Show inline comments
@@ -24,25 +24,26 @@ DeviceSetting = Tuple[DeviceUri, DeviceA
                      # currently, floats and hex color strings

# Alternate output range for a device. Instead of outputting 0.0 to
# 1.0, you can map that range into, say, 0.2 to 0.7
OutputRange = NewType('OutputRange', Tuple[float, float])

_ObjType = TypeVar('_ObjType')


def _isSubclass2(t1: Type, t2: Type) -> bool:
    """same as issubclass but t1 can be a NewType"""
    if hasattr(t1, '__supertype__'):
        t1 = t1.__supertype__
    return issubclass(t1, t2)


def typedValue(objType: Type[_ObjType], graph, subj, pred) -> _ObjType:
    """graph.value(subj, pred) with a given return type. 
    If objType is not an rdflib.Node, we toPython() the value."""
    obj = graph.value(subj, pred)
    if obj is None:
        raise ValueError()
    conv = obj if _isSubclass2(objType, Node) else obj.toPython()
    # may need to turn Decimal to float here
    return cast(objType, conv)
\ No newline at end of file
Show inline comments
import asyncio
from light9.run_local import log


def test_import():

    async def go():
        # this sets up some watcher tasks
        from service import app
        from light9.rdfdb.service import app
 , debug=True)
0 comments (0 inline, 0 general)