Changeset - 44fc477970bf
[Not reviewed]
Show inline comments
# This file is @generated by PDM.
# It is not intended for manual editing.

groups = ["default", "dev"]
strategy = ["cross_platform", "inherit_metadata"]
lock_version = "4.4.1"
content_hash = "sha256:5393d5c679935ba9f042f2b4f4d6efd58dbf03519b2e9f25e08f1e9d421e52f1"
content_hash = "sha256:6fac24ed6ab93fd328a74d22973a01c77d9de5b4a0b22cd111a884afd99f235f"

name = "aiohttp"
version = "3.9.5"
requires_python = ">=3.8"
summary = "Async http client/server framework (asyncio)"
groups = ["default", "dev"]
dependencies = [
@@ -2306,37 +2306,24 @@ files = [
name = "zipp"
version = "3.18.1"
requires_python = ">=3.8"
summary = "Backport of pathlib-compatible object wrapper for zip files"
groups = ["dev"]
files = [
    {file = "zipp-3.18.1-py3-none-any.whl", hash = "sha256:206f5a15f2af3dbaee80769fb7dc6f249695e940acca08dfb2a4769fe61e538b"},
    {file = "zipp-3.18.1.tar.gz", hash = "sha256:2884ed22e7d8961de1c9a05142eb69a247f120291bc0206a00a7642f09b5b715"},

name = "zmq"
version = "0.0.0"
summary = "You are probably looking for pyzmq."
groups = ["default"]
dependencies = [
files = [
    {file = "zmq-0.0.0.tar.gz", hash = "sha256:6b1a1de53338646e8c8405803cffb659e8eb7bb02fff4c9be62a7acfac8370c9"},
    {file = "", hash = "sha256:21cfc6be254c9bc25e4dabb8a3b2006a4227966b7b39a637426084c8dc6901f7"},

name = "zope-interface"
version = "6.3"
requires_python = ">=3.7"
summary = "Interfaces for Python"
groups = ["default"]
dependencies = [
files = [
    {file = "zope.interface-6.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:600101f43a7582d5b9504a7c629a1185a849ce65e60fca0f6968dfc4b76b6d39"},
    {file = "zope.interface-6.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4d6b229f5e1a6375f206455cc0a63a8e502ed190fe7eb15e94a312dc69d40299"},
    {file = "zope.interface-6.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:10cde8dc6b2fd6a1d0b5ca4be820063e46ddba417ab82bcf55afe2227337b130"},
Show inline comments
@@ -29,25 +29,24 @@ dependencies = [
    "light9 @ file:///${PROJECT_ROOT}/",
requires-python = ">=3.11"

Show inline comments
import logging
import time
from typing import Dict, List, Set, Tuple, cast
from light9.typedgraph import typedValue

from prometheus_client import Summary
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9.collector.device import resolve, toOutputAttrs
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners
from light9.effect.settings import DeviceSettings
from light9.namespaces import L9, RDF
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, uriTail)
from light9.newtypes import (
from light9.typedgraph import typedValue

log = logging.getLogger('collector')

STAT_SETATTR = Summary('set_attr', 'setAttr calls')


def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
    return DmxMessageIndex(base + offset - 1)


def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
    """From rdf config graph, compute a map of
       (device, outputattr) : (output, index)
    that explains which output index to set for any device update.
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})

    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
Show inline comments
import asyncio
import json
import logging
import time
from light9 import networking
from light9.effect.settings import DeviceSettings

import zmq.asyncio
from prometheus_client import Summary

from light9.effect.settings import DeviceSettings

log = logging.getLogger('coll_client')

ZMQ_SEND = Summary('zmq_send', 'calls')


def toCollectorJson(client, session, settings: DeviceSettings) -> str:
    assert isinstance(settings, DeviceSettings)
    return json.dumps({
        'settings': settings.asList(),
        'client': client,
        'clientSession': session,
        'sendTime': time.time(),
Show inline comments
import logging
from typing import Dict, List, Any, TypeVar, cast
from light9.namespaces import L9
from typing import Dict, List, cast

import colormath.color_conversions
from colormath.color_objects import CMYColor, sRGBColor
from rdflib import Literal, URIRef
from webcolors import hex_to_rgb, rgb_to_hex
from colormath.color_objects import sRGBColor, CMYColor
import colormath.color_conversions
from light9.newtypes import VT, DeviceClass, HexColor, OutputAttr, OutputValue, DeviceUri, DeviceAttr, VTUnion

from light9.namespaces import L9
from light9.newtypes import (

log = logging.getLogger('device')


class Device:


class ChauvetColorStrip(Device):
     device attrs:
@@ -41,28 +50,25 @@ def clamp255(x):
def _8bit(f):
    if not isinstance(f, (int, float)):
        raise TypeError(repr(f))
    return clamp255(int(f * 255))


def _maxColor(values: List[HexColor]) -> HexColor:
    rgbs = [hex_to_rgb(v) for v in values]
    maxes = [max(component) for component in zip(*rgbs)]
    return cast(HexColor, rgb_to_hex(tuple(maxes)))


def resolve(
        deviceType: DeviceClass,
        deviceAttr: DeviceAttr,
        values: List[VTUnion]) -> VTUnion:  # todo: return should be VT
def resolve(deviceType: DeviceClass, deviceAttr: DeviceAttr, values: List[VTUnion]) -> VTUnion:  # todo: return should be VT
    return one value to use for this attr, given a set of them that
    have come in simultaneously. len(values) >= 1.

    bug: some callers are passing a device instance for 1st arg
    if len(values) == 1:
        return values[0]
    if deviceAttr == DeviceAttr(L9['color']):
        return _maxColor(cast(List[HexColor], values))
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
    if deviceAttr in map(DeviceAttr, [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
@@ -73,26 +79,28 @@ def resolve(
            elif isinstance(v, (int, float)):
                raise TypeError(repr(v))

        # averaging with zeros? not so good
        return sum(floatVals) / len(floatVals)
    return max(values)


def toOutputAttrs(
        deviceType: DeviceClass,
        deviceAttrSettings: Dict[DeviceAttr, VTUnion  # TODO
                                ]) -> Dict[OutputAttr, OutputValue]:
        deviceAttrSettings: Dict[
            VTUnion  # TODO
        ]) -> Dict[OutputAttr, OutputValue]:
    return dict((OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(deviceType, deviceAttrSettings).items())


def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
    Given device attr settings like {L9['color']: Literal('#ff0000')},
    return a similar dict where the keys are output attrs (like
    L9['red']) and the values are suitable for Collector.setAttr

    :outputAttrRange happens before we get here.

Show inline comments
import asyncio
import io
import json
import logging
import time
from typing import Any, Awaitable, Dict, List, Protocol, Tuple

import fastavro
from fastavro.schema import load_schema
from light9.collector.output import Output as OutputInstance
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue)
import starlette.websockets
import websockets

log = logging.getLogger('weblisteners')


def shortenOutput(out: OutputUri) -> str:
    return str(out).rstrip('/').rsplit('/', 1)[-1]


class UiListener(Protocol):
@@ -88,14 +86,13 @@ class WebListeners:
        attrRows = []
        for attr, val in attrs.items():
            outputUri, bufIndex = outputMap[(dev, attr)]
            dmxIndex = DmxIndex(bufIndex + 1)
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (shortenOutput(outputUri), dmxIndex)})
        attrRows.sort(key=lambda r: r['chan'])
        for row in attrRows:
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])

        out = io.BytesIO()
        fastavro.schemaless_writer(out, self.CollectorUpdateSchema, {'OutputAttrsSet': {'dev': dev, 'attrs': attrRows}})
        msg = out.getvalue()
'made update message {len(msg)=}')
        return msg
Show inline comments
from rdflib import URIRef, Literal
import treq
from rdfdb.patch import Patch
from rdflib import Literal, URIRef
from twisted.internet.defer import inlineCallbacks, returnValue
import treq

from light9 import networking
from light9.curvecalc.curve import CurveResource
from light9.namespaces import L9, RDF, RDFS
from rdfdb.patch import Patch


def clamp(x, lo, hi):
    return max(lo, min(hi, x))


def getMusicStatus():
    resp = yield treq.get(networking.musicPlayer.path('time'), timeout=.5)
    body = yield resp.json_content()

@@ -44,31 +44,28 @@ def songEffectPatch(graph, dropped, song
            (song, L9['curve'], curve, ctx),
            (effect, RDFS.label, droppedLabel, ctx),
            (effect, L9['code'], Literal('env = %s' % curve.n3()), ctx),

        if L9['EffectClass'] in droppedTypes:
                (effect, RDF.type, dropped, ctx),
            ] + [(effect, L9['code'], c, ctx) for c in droppedCodes])
        elif L9['Submaster'] in droppedTypes:
                (effect, L9['code'], Literal('out = %s * env' % dropped.n3()),
                (effect, L9['code'], Literal('out = %s * env' % dropped.n3()), ctx),
            raise NotImplementedError(
                "don't know how to add an effect from %r (types=%r)" %
                (dropped, droppedTypes))
            raise NotImplementedError("don't know how to add an effect from %r (types=%r)" % (dropped, droppedTypes))

        _maybeAddMusicLine(quads, effect, song, ctx)

    for qq in quads:


def songNotePatch(graph, dropped, song, event, ctx, note=None):
@@ -81,47 +78,43 @@ def songNotePatch(graph, dropped, song, 

    quads = []
    fade = 2 if event == 'default' else 0.1

    if note:
        musicStatus = yield getMusicStatus()
        songTime = musicStatus['t']
        _finishCurve(graph, note, quads, ctx, songTime)
        if L9['Effect'] in droppedTypes:
            musicStatus = yield getMusicStatus()
            songTime = musicStatus['t']
            note = _makeNote(graph, song, note, quads, ctx, dropped, songTime,
                             event, fade)
            note = _makeNote(graph, song, note, quads, ctx, dropped, songTime, event, fade)
            raise NotImplementedError

    returnValue((note, Patch(addQuads=quads)))


def _point(ctx, uri, t, v):
    return [(uri, L9['time'], Literal(round(t, 3)), ctx),
            (uri, L9['value'], Literal(round(v, 3)), ctx)]
    return [(uri, L9['time'], Literal(round(t, 3)), ctx), (uri, L9['value'], Literal(round(v, 3)), ctx)]


def _finishCurve(graph, note, quads, ctx, songTime):
    with graph.currentState() as g:
        origin = g.value(note, L9['originTime']).toPython()
        curve = g.value(note, L9['curve'])

    pt2 = graph.sequentialUri(curve + 'p')
    pt3 = graph.sequentialUri(curve + 'p')
    quads.extend([(curve, L9['point'], pt2, ctx)] +
                 _point(ctx, pt2, songTime - origin, 1) +
                 [(curve, L9['point'], pt3, ctx)] +
    quads.extend([(curve, L9['point'], pt2, ctx)] + _point(ctx, pt2, songTime - origin, 1) + [(curve, L9['point'], pt3, ctx)] +
                 _point(ctx, pt3, songTime - origin + .5, 0))


def _makeNote(graph, song, note, quads, ctx, dropped, songTime, event, fade):
    note = graph.sequentialUri(song + '/n')
    curve = graph.sequentialUri(note + 'c')
        (song, L9['note'], note, ctx),
        (note, RDF.type, L9['Note'], ctx),
        (note, L9['curve'], curve, ctx),
        (note, L9['effectClass'], dropped, ctx),
        (note, L9['originTime'], Literal(songTime), ctx),
@@ -190,16 +183,14 @@ def _insertEnvelopePoints(curve, fade=2)
    curve.insert_pt((t2, 1))
    curve.insert_pt((t2 + fade, 0))


def _maybeAddMusicLine(quads, effect, song, ctx):
    add a line getting the current music into 'music' if any code might
    be mentioning that var

    for spoc in quads:
        if spoc[1] == L9['code'] and 'music' in spoc[2]:
            quads.extend([(effect, L9['code'],
                           Literal('music = %s' % musicCurveForSong(song).n3()),
            quads.extend([(effect, L9['code'], Literal('music = %s' % musicCurveForSong(song).n3()), ctx)])
Show inline comments
import logging
import random
from typing import cast

from PIL import Image
from webcolors import rgb_to_hex

from light9.effect.scale import scale
from light9.effect.settings import DeviceSettings
from light9.namespaces import L9
from light9.newtypes import HexColor


log = logging.getLogger('effectfunc')


def sample8(img, x, y, repeat=False):
def sample8(img, x, y, repeat=False) -> tuple[int, int, int]:
    if not (0 <= y < img.height):
        return (0, 0, 0)
    if 0 <= x < img.width:
        return img.getpixel((x, y))
    elif not repeat:
        return (0, 0, 0)
        return img.getpixel((x % img.width, y))


def effect_scale(strength: float, devs: DeviceSettings) -> DeviceSettings:
    out = []
@@ -45,19 +47,19 @@ def effect_strobe(
    return effect_scale(scl, devs)


def effect_image(
    songTime: float,  #
    strength: float,
    period: float,
    image: Image.Image,
    devs: DeviceSettings,
) -> DeviceSettings:
    x = int((songTime / period) * image.width)
    out = []
    for y, (d, da, v) in enumerate(devs.asOrderedList()):
    for y, (d, da, v) in enumerate(devs.asList()):
        if da != L9['color']:
        color8 = sample8(image, x, y, repeat=True)
        color = rgb_to_hex(tuple(color8))
        out.append((d, da, scale(color, strength * v)))
    return DeviceSettings(devs.graph, out)
\ No newline at end of file
        color = HexColor(rgb_to_hex(color8))
        out.append((d, da, scale(color, strength * cast(float, v))))
    return DeviceSettings(devs.graph, out)
Show inline comments
import traceback
import inspect
import logging
import traceback
from dataclasses import dataclass
from typing import Callable, List, Optional

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import RDF
from rdflib.term import Node

from light9.effect.effect_function_library import EffectFunctionLibrary
from light9.effect.settings import DeviceSettings, EffectSettings
from light9.namespaces import L9
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectFunction, EffectUri, VTUnion)
from light9.newtypes import (
from light9.typedgraph import typedValue

log = logging.getLogger('effecteval')


class Config:
    effectFunction: EffectFunction
    esettings: EffectSettings
    devSettings: Optional[DeviceSettings]  # the EffectSettings :effectAttr :devSettings item, if there was one
    func: Callable
    funcArgs: List[inspect.Parameter]
@@ -83,25 +90,25 @@ class EffectEval2:
        calls our function using inputs (publishedAttr attrs, e.g. :strength)
        and effect-level settings including a special attr called :deviceSettings
        with DeviceSettings as its value
        if self.config is None:
            return DeviceSettings(self.graph, [])

        c = self.config
        kw = {}
        for arg in c.funcArgs:
            if arg.annotation == DeviceSettings:
                v = c.devSettings
                if v is None: # asked for ds but we have none
                if v is None:  # asked for ds but we have none
                    log.debug("%s asked for devs but we have none in config", self.uri)
                    return DeviceSettings(self.graph, [])
            elif == 'songTime':
                v = songTime
                eaForName = EffectAttr(L9[])
                v = self._getEffectAttrValue(eaForName, inputs)

            kw[] = v

        if False and log.isEnabledFor(logging.DEBUG):
            log.debug('calling %s with %s', c.func, kw)
Show inline comments
import logging
from decimal import Decimal

from webcolors import hex_to_rgb, rgb_to_hex

from light9.newtypes import VTUnion

log = logging.getLogger('scale')


def scale(value: VTUnion, strength: float):
    if isinstance(value, Decimal):
        raise TypeError()

    if isinstance(value, str):
        if value[0] == '#':
            if strength == '#ffffff':
                return value
            r, g, b = hex_to_rgb(value)
            # if isinstance(strength, Literal):
            #     strength = strength.toPython()
Show inline comments
import traceback
import logging
import time
import traceback
from dataclasses import dataclass
from typing import List, Optional, cast

from prometheus_client import Summary
from rdfdb import SyncedGraph
from rdflib import URIRef
from rdflib.term import Node

from light9.effect.effect_function_library import EffectFunctionLibrary
from light9.effect.effecteval2 import EffectEval2
from light9.effect.settings import DeviceSettings, EffectSettings
from light9.namespaces import L9, RDF
Show inline comments
@@ -5,26 +5,26 @@ plays back effect notes from the timelin
import asyncio
import json
import logging
import time

from louie import dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from light9 import networking
from light9.background_loop import loop_forever
from light9 import networking
from light9.collector.collector_client_asyncio import sendToCollector
from light9.effect.effect_function_library import EffectFunctionLibrary
from light9.effect.sequencer.eval_faders import FaderEval
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
from light9.run_local import log

RATE = 20


async def changes():
    state = {}
    q = asyncio.Queue()
@@ -42,28 +42,28 @@ async def changes():
        if now > lastSend + .2:
            lastSend = now
            yield json.dumps(state)


async def send_page_updates(request):
    return EventSourceResponse(changes())


def main():
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")



    # seq = Sequencer(graph, send)  # per-song timed notes
    lib = EffectFunctionLibrary(graph)
    faders = FaderEval(graph, lib)  # bin/fade's untimed effects

    #@metrics('computeAndSend').time() # needs rework with async
    async def update(first_run):
        ds = faders.computeOutput()
        await sendToCollector('effectSequencer', session='0', settings=ds)

Show inline comments
@@ -10,25 +10,25 @@ from __future__ import annotations
import decimal
import logging
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple, cast

import numpy
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal, URIRef

from light9.collector.device import resolve
from light9.localsyncedgraph import LocalSyncedGraph
from light9.namespaces import L9, RDF
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion)
from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion

log = logging.getLogger('settings')


def parseHex(h):
    if h[0] != '#':
        raise ValueError(h)
    return [int(h[i:i + 2], 16) for i in (1, 3, 5)]


def parseHexNorm(h):
    return [x / 255 for x in parseHex(h)]
0 comments (0 inline, 0 general)