Changeset - 44fc477970bf
[Not reviewed]
pdm.lock
Show inline comments
 
# This file is @generated by PDM.
 
# It is not intended for manual editing.
 

	
 
[metadata]
 
groups = ["default", "dev"]
 
strategy = ["cross_platform", "inherit_metadata"]
 
lock_version = "4.4.1"
 
content_hash = "sha256:5393d5c679935ba9f042f2b4f4d6efd58dbf03519b2e9f25e08f1e9d421e52f1"
 
content_hash = "sha256:6fac24ed6ab93fd328a74d22973a01c77d9de5b4a0b22cd111a884afd99f235f"
 

	
 
[[package]]
 
name = "aiohttp"
 
version = "3.9.5"
 
requires_python = ">=3.8"
 
summary = "Async http client/server framework (asyncio)"
 
groups = ["default", "dev"]
 
dependencies = [
 
    "aiosignal>=1.1.2",
 
    "attrs>=17.3.0",
 
    "frozenlist>=1.1.1",
 
    "multidict<7.0,>=4.5",
 
    "yarl<2.0,>=1.0",
 
]
 
files = [
 
    {file = "aiohttp-3.9.5-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:e0ae53e33ee7476dd3d1132f932eeb39bf6125083820049d06edcdca4381f342"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c088c4d70d21f8ca5c0b8b5403fe84a7bc8e024161febdd4ef04575ef35d474d"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:639d0042b7670222f33b0028de6b4e2fad6451462ce7df2af8aee37dcac55424"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f26383adb94da5e7fb388d441bf09c61e5e35f455a3217bfd790c6b6bc64b2ee"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:66331d00fb28dc90aa606d9a54304af76b335ae204d1836f65797d6fe27f1ca2"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4ff550491f5492ab5ed3533e76b8567f4b37bd2995e780a1f46bca2024223233"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f22eb3a6c1080d862befa0a89c380b4dafce29dc6cd56083f630073d102eb595"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a81b1143d42b66ffc40a441379387076243ef7b51019204fd3ec36b9f69e77d6"},
 
    {file = "aiohttp-3.9.5-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:f64fd07515dad67f24b6ea4a66ae2876c01031de91c93075b8093f07c0a2d93d"},
 
@@ -2294,60 +2294,47 @@ files = [
 
    {file = "yarl-1.9.4-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:09efe4615ada057ba2d30df871d2f668af661e971dfeedf0c159927d48bbeff0"},
 
    {file = "yarl-1.9.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:008d3e808d03ef28542372d01057fd09168419cdc8f848efe2804f894ae03e51"},
 
    {file = "yarl-1.9.4-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:6f5cb257bc2ec58f437da2b37a8cd48f666db96d47b8a3115c29f316313654ff"},
 
    {file = "yarl-1.9.4-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:992f18e0ea248ee03b5a6e8b3b4738850ae7dbb172cc41c966462801cbf62cf7"},
 
    {file = "yarl-1.9.4-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:0e9d124c191d5b881060a9e5060627694c3bdd1fe24c5eecc8d5d7d0eb6faabc"},
 
    {file = "yarl-1.9.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:3986b6f41ad22988e53d5778f91855dc0399b043fc8946d4f2e68af22ee9ff10"},
 
    {file = "yarl-1.9.4-cp312-cp312-win32.whl", hash = "sha256:4b21516d181cd77ebd06ce160ef8cc2a5e9ad35fb1c5930882baff5ac865eee7"},
 
    {file = "yarl-1.9.4-cp312-cp312-win_amd64.whl", hash = "sha256:a9bd00dc3bc395a662900f33f74feb3e757429e545d831eef5bb280252631984"},
 
    {file = "yarl-1.9.4-py3-none-any.whl", hash = "sha256:928cecb0ef9d5a7946eb6ff58417ad2fe9375762382f1bf5c55e61645f2c43ad"},
 
    {file = "yarl-1.9.4.tar.gz", hash = "sha256:566db86717cf8080b99b58b083b773a908ae40f06681e87e589a976faf8246bf"},
 
]
 

	
 
[[package]]
 
name = "zipp"
 
version = "3.18.1"
 
requires_python = ">=3.8"
 
summary = "Backport of pathlib-compatible object wrapper for zip files"
 
groups = ["dev"]
 
files = [
 
    {file = "zipp-3.18.1-py3-none-any.whl", hash = "sha256:206f5a15f2af3dbaee80769fb7dc6f249695e940acca08dfb2a4769fe61e538b"},
 
    {file = "zipp-3.18.1.tar.gz", hash = "sha256:2884ed22e7d8961de1c9a05142eb69a247f120291bc0206a00a7642f09b5b715"},
 
]
 

	
 
[[package]]
 
name = "zmq"
 
version = "0.0.0"
 
summary = "You are probably looking for pyzmq."
 
groups = ["default"]
 
dependencies = [
 
    "pyzmq",
 
]
 
files = [
 
    {file = "zmq-0.0.0.tar.gz", hash = "sha256:6b1a1de53338646e8c8405803cffb659e8eb7bb02fff4c9be62a7acfac8370c9"},
 
    {file = "zmq-0.0.0.zip", hash = "sha256:21cfc6be254c9bc25e4dabb8a3b2006a4227966b7b39a637426084c8dc6901f7"},
 
]
 

	
 
[[package]]
 
name = "zope-interface"
 
version = "6.3"
 
requires_python = ">=3.7"
 
summary = "Interfaces for Python"
 
groups = ["default"]
 
dependencies = [
 
    "setuptools",
 
]
 
files = [
 
    {file = "zope.interface-6.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:600101f43a7582d5b9504a7c629a1185a849ce65e60fca0f6968dfc4b76b6d39"},
 
    {file = "zope.interface-6.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4d6b229f5e1a6375f206455cc0a63a8e502ed190fe7eb15e94a312dc69d40299"},
 
    {file = "zope.interface-6.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:10cde8dc6b2fd6a1d0b5ca4be820063e46ddba417ab82bcf55afe2227337b130"},
 
    {file = "zope.interface-6.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:40aa8c8e964d47d713b226c5baf5f13cdf3a3169c7a2653163b17ff2e2334d10"},
 
    {file = "zope.interface-6.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d165d7774d558ea971cb867739fb334faf68fc4756a784e689e11efa3becd59e"},
 
    {file = "zope.interface-6.3-cp311-cp311-win_amd64.whl", hash = "sha256:69dedb790530c7ca5345899a1b4cb837cc53ba669051ea51e8c18f82f9389061"},
 
    {file = "zope.interface-6.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8d407e0fd8015f6d5dfad481309638e1968d70e6644e0753f229154667dd6cd5"},
 
    {file = "zope.interface-6.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:72d5efecad16c619a97744a4f0b67ce1bcc88115aa82fcf1dc5be9bb403bcc0b"},
 
    {file = "zope.interface-6.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:567d54c06306f9c5b6826190628d66753b9f2b0422f4c02d7c6d2b97ebf0a24e"},
 
    {file = "zope.interface-6.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:483e118b1e075f1819b3c6ace082b9d7d3a6a5eb14b2b375f1b80a0868117920"},
 
    {file = "zope.interface-6.3-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2bb78c12c1ad3a20c0d981a043d133299117b6854f2e14893b156979ed4e1d2c"},
 
    {file = "zope.interface-6.3-cp312-cp312-win_amd64.whl", hash = "sha256:ad4524289d8dbd6fb5aa17aedb18f5643e7d48358f42c007a5ee51a2afc2a7c5"},
 
    {file = "zope.interface-6.3.tar.gz", hash = "sha256:f83d6b4b22262d9a826c3bd4b2fbfafe1d0000f085ef8e44cd1328eea274ae6a"},
 
]
pyproject.toml
Show inline comments
 
@@ -17,49 +17,48 @@ dependencies = [
 
    "flask==2.2.4", # workaround for pydmxcontrol
 
    "ipython>=8.13.2",
 
    "louie>=2.0",
 
    "moviepy>=1.0.3",
 
    "noise>=1.2.2",
 
    "prometheus-client>=0.14.1",
 
    "pydmxcontrol>=2.0.0",
 
    "PyGObject>=3.42.1",
 
    "python-dateutil>=2.8.2",
 
    "rdflib>=6.3.2",
 
    "requests>=2.30.0",
 
    "rx>=3.2.0",
 
    "sse-starlette>=0.10.3",
 
    "starlette-exporter>=0.12.0",
 
    "starlette>=0.27.0",
 
    "statprof>=0.1.2",
 
    "toposort>=1.10",
 
    "udmx-pyusb>=2.0.0",
 
    "uvicorn[standard]>=0.17.6",
 
    "watchdog>=2.1.7",
 
    "webcolors>=1.11.1",
 
    "scipy>=1.9.3",
 
    "braillegraph>=0.6",
 
    "tenacity>=8.2.2",
 
    "zmq>=0.0.0",
 
    "mido>=1.2.10",
 
    "alsa-midi>=1.0.1",
 
    "treq>=22.2.0",
 
    "light9 @ file:///${PROJECT_ROOT}/",
 
    "python-debouncer>=0.1.4",
 
    "pytest>=8.2.0",
 
    "avro>=1.11.3",
 
    "fastavro>=1.9.4",
 
    "yappi>=1.6.0",
 
]
 
requires-python = ">=3.11"
 

	
 
readme = "README.md"
 

	
 
[project.urls]
 
Homepage = "https://bigasterisk.com/light9/"
 

	
 
[tool.pdm]
 

	
 
[tool.pdm.dev-dependencies]
 
dev = [
 
    "coverage>=7.2.5",
 
    "flake8>=6.0.0",
 
    "freezegun>=1.2.2",
src/light9/collector/collector.py
Show inline comments
 
import logging
 
import time
 
from typing import Dict, List, Set, Tuple, cast
 
from light9.typedgraph import typedValue
 

	
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, uriTail)
 
from light9.newtypes import (
 
    ClientSessionType,
 
    ClientType,
 
    DeviceAttr,
 
    DeviceClass,
 
    DeviceUri,
 
    DmxIndex,
 
    DmxMessageIndex,
 
    OutputAttr,
 
    OutputRange,
 
    OutputUri,
 
    OutputValue,
 
    UnixTime,
 
    VTUnion,
 
    uriTail,
 
)
 
from light9.typedgraph import typedValue
 

	
 
log = logging.getLogger('collector')
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('  mapping devices of class %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('    💡 mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
src/light9/collector/collector_client_asyncio.py
Show inline comments
 
import asyncio
 
import json
 
import logging
 
import time
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 

	
 
import zmq.asyncio
 
from prometheus_client import Summary
 

	
 
from light9.effect.settings import DeviceSettings
 

	
 
log = logging.getLogger('coll_client')
 

	
 
ZMQ_SEND = Summary('zmq_send', 'calls')
 

	
 

	
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
class _Sender:
 

	
 
    def __init__(self):
 
        self.context = zmq.asyncio.Context()
 
        self.socket = self.context.socket(zmq.PUB)
 
        self.socket.connect('tcp://127.0.0.1:9203')  #todo: tie to :collectorZmq in graph
 
        # old version used: 'tcp://%s:%s' % (service.host, service.port)
 

	
 
    @ZMQ_SEND.time()
src/light9/collector/device.py
Show inline comments
 
import logging
 
from typing import Dict, List, Any, TypeVar, cast
 
from light9.namespaces import L9
 
from typing import Dict, List, cast
 

	
 
import colormath.color_conversions
 
from colormath.color_objects import CMYColor, sRGBColor
 
from rdflib import Literal, URIRef
 
from webcolors import hex_to_rgb, rgb_to_hex
 
from colormath.color_objects import sRGBColor, CMYColor
 
import colormath.color_conversions
 
from light9.newtypes import VT, DeviceClass, HexColor, OutputAttr, OutputValue, DeviceUri, DeviceAttr, VTUnion
 

	
 
from light9.namespaces import L9
 
from light9.newtypes import (
 
    DeviceAttr,
 
    DeviceClass,
 
    HexColor,
 
    OutputAttr,
 
    OutputValue,
 
    VTUnion,
 
)
 

	
 
log = logging.getLogger('device')
 

	
 

	
 
class Device:
 
    pass
 

	
 

	
 
class ChauvetColorStrip(Device):
 
    """
 
     device attrs:
 
       color
 
    """
 

	
 

	
 
class Mini15(Device):
 
    """
 
    plan:
 

	
 
      device attrs
 
        rx, ry
 
        color
 
        gobo
 
        goboShake
 
        imageAim (configured with a file of calibration data)
 
    """
 

	
 

	
 
def clamp255(x):
 
    return min(255, max(0, x))
 

	
 

	
 
def _8bit(f):
 
    if not isinstance(f, (int, float)):
 
        raise TypeError(repr(f))
 
    return clamp255(int(f * 255))
 

	
 

	
 
def _maxColor(values: List[HexColor]) -> HexColor:
 
    rgbs = [hex_to_rgb(v) for v in values]
 
    maxes = [max(component) for component in zip(*rgbs)]
 
    return cast(HexColor, rgb_to_hex(tuple(maxes)))
 

	
 

	
 
def resolve(
 
        deviceType: DeviceClass,
 
        deviceAttr: DeviceAttr,
 
        values: List[VTUnion]) -> VTUnion:  # todo: return should be VT
 
def resolve(deviceType: DeviceClass, deviceAttr: DeviceAttr, values: List[VTUnion]) -> VTUnion:  # todo: return should be VT
 
    """
 
    return one value to use for this attr, given a set of them that
 
    have come in simultaneously. len(values) >= 1.
 

	
 
    bug: some callers are passing a device instance for 1st arg
 
    """
 
    if len(values) == 1:
 
        return values[0]
 
    if deviceAttr == DeviceAttr(L9['color']):
 
        return _maxColor(cast(List[HexColor], values))
 
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
 
    if deviceAttr in map(DeviceAttr, [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
 
        floatVals = []
 
        for v in values:
 
            if isinstance(v, Literal):
 
                floatVals.append(float(v.toPython()))
 
            elif isinstance(v, (int, float)):
 
                floatVals.append(float(v))
 
            else:
 
                raise TypeError(repr(v))
 

	
 
        # averaging with zeros? not so good
 
        return sum(floatVals) / len(floatVals)
 
    return max(values)
 

	
 

	
 
def toOutputAttrs(
 
        deviceType: DeviceClass,
 
        deviceAttrSettings: Dict[DeviceAttr, VTUnion  # TODO
 
        deviceAttrSettings: Dict[
 
            DeviceAttr,
 
            VTUnion  # TODO
 
                                ]) -> Dict[OutputAttr, OutputValue]:
 
    return dict((OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(deviceType, deviceAttrSettings).items())
 

	
 

	
 
def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
 
    """
 
    Given device attr settings like {L9['color']: Literal('#ff0000')},
 
    return a similar dict where the keys are output attrs (like
 
    L9['red']) and the values are suitable for Collector.setAttr
 

	
 
    :outputAttrRange happens before we get here.
 
    """
 

	
 
    def floatAttr(attr, default=0):
 
        out = deviceAttrSettings.get(attr)
 
        if out is None:
 
            return default
 
        return float(out.toPython()) if isinstance(out, Literal) else out
 

	
 
    def rgbAttr(attr):
 
        color = deviceAttrSettings.get(attr, '#000000')
 
        r, g, b = hex_to_rgb(color)
 
        return r, g, b
 

	
src/light9/collector/weblisteners.py
Show inline comments
 
import asyncio
 
import io
 
import json
 
import logging
 
import time
 
from typing import Any, Awaitable, Dict, List, Protocol, Tuple
 

	
 
import fastavro
 
from fastavro.schema import load_schema
 
from light9.collector.output import Output as OutputInstance
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue)
 
import starlette.websockets
 
import websockets
 

	
 
log = logging.getLogger('weblisteners')
 

	
 

	
 
def shortenOutput(out: OutputUri) -> str:
 
    return str(out).rstrip('/').rsplit('/', 1)[-1]
 

	
 

	
 
class UiListener(Protocol):
 

	
 
    async def sendMessage(self, msg):
 
        ...
 

	
 

	
 
class WebListeners:
 

	
 
    def __init__(self) -> None:
 
        self.CollectorUpdateSchema = load_schema('avro/CollectorUpdate.avsc')
 
        self.clients: List[Tuple[UiListener, Dict[DeviceUri, Dict[OutputAttr, OutputValue]]]] = []
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                                 DmxMessageIndex]]]] = {}
 
@@ -76,26 +74,25 @@ class WebListeners:
 
            # could lead to slowdowns in the real dmx output.
 
            for client, seen in self.clients:
 
                if seen.get(dev) == attrs:
 
                    continue
 
                if msg is None:
 
                    msg = self.makeMsg(dev, attrs, outputMap)
 

	
 
                seen[dev] = attrs
 
                sendAwaits.append(client.sendMessage(msg))
 
            await asyncio.gather(*sendAwaits)
 

	
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            outputUri, bufIndex = outputMap[(dev, attr)]
 
            dmxIndex = DmxIndex(bufIndex + 1)
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (shortenOutput(outputUri), dmxIndex)})
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
 
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
 

	
 
        out = io.BytesIO()
 
        fastavro.schemaless_writer(out, self.CollectorUpdateSchema, {'OutputAttrsSet': {'dev': dev, 'attrs': attrRows}})
 
        msg = out.getvalue()
 
        log.info(f'made update message {len(msg)=}')
 
        return msg
src/light9/effect/edit.py
Show inline comments
 
from rdflib import URIRef, Literal
 
import treq
 
from rdfdb.patch import Patch
 
from rdflib import Literal, URIRef
 
from twisted.internet.defer import inlineCallbacks, returnValue
 
import treq
 

	
 
from light9 import networking
 
from light9.curvecalc.curve import CurveResource
 
from light9.namespaces import L9, RDF, RDFS
 
from rdfdb.patch import Patch
 

	
 

	
 
def clamp(x, lo, hi):
 
    return max(lo, min(hi, x))
 

	
 

	
 
@inlineCallbacks
 
def getMusicStatus():
 
    resp = yield treq.get(networking.musicPlayer.path('time'), timeout=.5)
 
    body = yield resp.json_content()
 
    returnValue(body)
 

	
 

	
 
@inlineCallbacks
 
def songEffectPatch(graph, dropped, song, event, ctx):
 
    """
 
    some uri was 'dropped' in the curvecalc timeline. event is 'default' or 'start' or 'end'.
 
    """
 
    with graph.currentState(tripleFilter=(dropped, None, None)) as g:
 
        droppedTypes = list(g.objects(dropped, RDF.type))
 
        droppedLabel = g.label(dropped)
 
        droppedCodes = list(g.objects(dropped, L9['code']))
 

	
 
    quads = []
 
    fade = 2 if event == 'default' else 0
 

	
 
    if _songHasEffect(graph, song, dropped):
 
        # bump the existing curve
 
        pass
 
    else:
 
        effect, q = _newEffect(graph, song, ctx)
 
        quads.extend(q)
 

	
 
        curve = graph.sequentialUri(song + "/curve-")
 
        yield _newEnvelopeCurve(graph, ctx, curve, droppedLabel, fade)
 
        quads.extend([
 
            (song, L9['curve'], curve, ctx),
 
            (effect, RDFS.label, droppedLabel, ctx),
 
            (effect, L9['code'], Literal('env = %s' % curve.n3()), ctx),
 
        ])
 

	
 
        if L9['EffectClass'] in droppedTypes:
 
            quads.extend([
 
                (effect, RDF.type, dropped, ctx),
 
            ] + [(effect, L9['code'], c, ctx) for c in droppedCodes])
 
        elif L9['Submaster'] in droppedTypes:
 
            quads.extend([
 
                (effect, L9['code'], Literal('out = %s * env' % dropped.n3()),
 
                 ctx),
 
                (effect, L9['code'], Literal('out = %s * env' % dropped.n3()), ctx),
 
            ])
 
        else:
 
            raise NotImplementedError(
 
                "don't know how to add an effect from %r (types=%r)" %
 
                (dropped, droppedTypes))
 
            raise NotImplementedError("don't know how to add an effect from %r (types=%r)" % (dropped, droppedTypes))
 

	
 
        _maybeAddMusicLine(quads, effect, song, ctx)
 

	
 
    print("adding")
 
    for qq in quads:
 
        print(qq)
 
    returnValue(Patch(addQuads=quads))
 

	
 

	
 
@inlineCallbacks
 
def songNotePatch(graph, dropped, song, event, ctx, note=None):
 
    """
 
    drop into effectsequencer timeline
 

	
 
    ported from timeline.coffee makeNewNote
 
    """
 
    with graph.currentState(tripleFilter=(dropped, None, None)) as g:
 
        droppedTypes = list(g.objects(dropped, RDF.type))
 

	
 
    quads = []
 
    fade = 2 if event == 'default' else 0.1
 

	
 
    if note:
 
        musicStatus = yield getMusicStatus()
 
        songTime = musicStatus['t']
 
        _finishCurve(graph, note, quads, ctx, songTime)
 
    else:
 
        if L9['Effect'] in droppedTypes:
 
            musicStatus = yield getMusicStatus()
 
            songTime = musicStatus['t']
 
            note = _makeNote(graph, song, note, quads, ctx, dropped, songTime,
 
                             event, fade)
 
            note = _makeNote(graph, song, note, quads, ctx, dropped, songTime, event, fade)
 
        else:
 
            raise NotImplementedError
 

	
 
    returnValue((note, Patch(addQuads=quads)))
 

	
 

	
 
def _point(ctx, uri, t, v):
 
    return [(uri, L9['time'], Literal(round(t, 3)), ctx),
 
            (uri, L9['value'], Literal(round(v, 3)), ctx)]
 
    return [(uri, L9['time'], Literal(round(t, 3)), ctx), (uri, L9['value'], Literal(round(v, 3)), ctx)]
 

	
 

	
 
def _finishCurve(graph, note, quads, ctx, songTime):
 
    with graph.currentState() as g:
 
        origin = g.value(note, L9['originTime']).toPython()
 
        curve = g.value(note, L9['curve'])
 

	
 
    pt2 = graph.sequentialUri(curve + 'p')
 
    pt3 = graph.sequentialUri(curve + 'p')
 
    quads.extend([(curve, L9['point'], pt2, ctx)] +
 
                 _point(ctx, pt2, songTime - origin, 1) +
 
                 [(curve, L9['point'], pt3, ctx)] +
 
    quads.extend([(curve, L9['point'], pt2, ctx)] + _point(ctx, pt2, songTime - origin, 1) + [(curve, L9['point'], pt3, ctx)] +
 
                 _point(ctx, pt3, songTime - origin + .5, 0))
 

	
 

	
 
def _makeNote(graph, song, note, quads, ctx, dropped, songTime, event, fade):
 
    note = graph.sequentialUri(song + '/n')
 
    curve = graph.sequentialUri(note + 'c')
 
    quads.extend([
 
        (song, L9['note'], note, ctx),
 
        (note, RDF.type, L9['Note'], ctx),
 
        (note, L9['curve'], curve, ctx),
 
        (note, L9['effectClass'], dropped, ctx),
 
        (note, L9['originTime'], Literal(songTime), ctx),
 
        (curve, RDF.type, L9['Curve'], ctx),
 
        (curve, L9['attr'], L9['strength'], ctx),
 
    ])
 
    if event == 'default':
 
        coords = [(0 - fade, 0), (0, 1), (20, 1), (20 + fade, 0)]
 
    elif event == 'start':
 
        coords = [
 
            (0 - fade, 0),
 
            (0, 1),
 
        ]
 
    elif event == 'end':  # probably unused- goes to _finishCurve instead
 
        coords = [(20, 1), (20 + fade, 0)]
 
@@ -178,28 +171,26 @@ def _newEnvelopeCurve(graph, ctx, uri, l
 
@inlineCallbacks
 
def _insertEnvelopePoints(curve, fade=2):
 
    # wrong: we might not be adding to the currently-playing song.
 
    musicStatus = yield getMusicStatus()
 
    songTime = musicStatus['t']
 
    songDuration = musicStatus['duration']
 

	
 
    t1 = clamp(songTime - fade, .1, songDuration - .1 * 2) + fade
 
    t2 = clamp(songTime + 20, t1 + .1, songDuration)
 

	
 
    curve.insert_pt((t1 - fade, 0))
 
    curve.insert_pt((t1, 1))
 
    curve.insert_pt((t2, 1))
 
    curve.insert_pt((t2 + fade, 0))
 

	
 

	
 
def _maybeAddMusicLine(quads, effect, song, ctx):
 
    """
 
    add a line getting the current music into 'music' if any code might
 
    be mentioning that var
 
    """
 

	
 
    for spoc in quads:
 
        if spoc[1] == L9['code'] and 'music' in spoc[2]:
 
            quads.extend([(effect, L9['code'],
 
                           Literal('music = %s' % musicCurveForSong(song).n3()),
 
                           ctx)])
 
            quads.extend([(effect, L9['code'], Literal('music = %s' % musicCurveForSong(song).n3()), ctx)])
 
            break
src/light9/effect/effect_functions.py
Show inline comments
 
import logging
 
import random
 
from typing import cast
 

	
 
from PIL import Image
 
from webcolors import rgb_to_hex
 

	
 
from light9.effect.scale import scale
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import L9
 
from light9.newtypes import HexColor
 

	
 
random.seed(0)
 

	
 
log = logging.getLogger('effectfunc')
 

	
 

	
 
def sample8(img, x, y, repeat=False):
 
def sample8(img, x, y, repeat=False) -> tuple[int, int, int]:
 
    if not (0 <= y < img.height):
 
        return (0, 0, 0)
 
    if 0 <= x < img.width:
 
        return img.getpixel((x, y))
 
    elif not repeat:
 
        return (0, 0, 0)
 
    else:
 
        return img.getpixel((x % img.width, y))
 

	
 

	
 
def effect_scale(strength: float, devs: DeviceSettings) -> DeviceSettings:
 
    out = []
 
    if strength != 0:
 
        for d, da, v in devs.asList():
 
            out.append((d, da, scale(v, strength)))
 
    return DeviceSettings(devs.graph, out)
 

	
 

	
 
def effect_strobe(
 
        songTime: float,  #
 
        strength: float,
 
        period: float,
 
        onTime: float,
 
        devs: DeviceSettings) -> DeviceSettings:
 
    if period == 0:
 
        scl = 0
 
    else:
 
        scl = strength if (songTime % period) < onTime else 0
 
    return effect_scale(scl, devs)
 

	
 

	
 
def effect_image(
 
    songTime: float,  #
 
    strength: float,
 
    period: float,
 
    image: Image.Image,
 
    devs: DeviceSettings,
 
) -> DeviceSettings:
 
    x = int((songTime / period) * image.width)
 
    out = []
 
    for y, (d, da, v) in enumerate(devs.asOrderedList()):
 
    for y, (d, da, v) in enumerate(devs.asList()):
 
        if da != L9['color']:
 
            continue
 
        color8 = sample8(image, x, y, repeat=True)
 
        color = rgb_to_hex(tuple(color8))
 
        out.append((d, da, scale(color, strength * v)))
 
    return DeviceSettings(devs.graph, out)
 
\ No newline at end of file
 
        color = HexColor(rgb_to_hex(color8))
 
        out.append((d, da, scale(color, strength * cast(float, v))))
 
    return DeviceSettings(devs.graph, out)
src/light9/effect/effecteval2.py
Show inline comments
 
import traceback
 
import inspect
 
import logging
 
import traceback
 
from dataclasses import dataclass
 
from typing import Callable, List, Optional
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import RDF
 
from rdflib.term import Node
 

	
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.settings import DeviceSettings, EffectSettings
 
from light9.namespaces import L9
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, EffectFunction, EffectUri, VTUnion)
 
from light9.newtypes import (
 
    DeviceAttr,
 
    DeviceUri,
 
    EffectAttr,
 
    EffectFunction,
 
    EffectUri,
 
    VTUnion,
 
)
 
from light9.typedgraph import typedValue
 

	
 
log = logging.getLogger('effecteval')
 

	
 

	
 
@dataclass
 
class Config:
 
    effectFunction: EffectFunction
 
    esettings: EffectSettings
 
    devSettings: Optional[DeviceSettings]  # the EffectSettings :effectAttr :devSettings item, if there was one
 
    func: Callable
 
    funcArgs: List[inspect.Parameter]
 

	
 

	
 
@dataclass
 
class EffectEval2:
 
    """Runs one effect code to turn EffectSettings (e.g. strength) into DeviceSettings"""
 
    graph: SyncedGraph
 
    uri: EffectUri
 
    lib: EffectFunctionLibrary
 

	
 
    config: Optional[Config] = None
 

	
 
    def __post_init__(self):
src/light9/effect/scale.py
Show inline comments
 
import logging
 
from decimal import Decimal
 

	
 
from webcolors import hex_to_rgb, rgb_to_hex
 

	
 
from light9.newtypes import VTUnion
 

	
 
log = logging.getLogger('scale')
 

	
 

	
 
def scale(value: VTUnion, strength: float):
 
    if isinstance(value, Decimal):
 
        raise TypeError()
 

	
 
    if isinstance(value, str):
 
        if value[0] == '#':
 
            if strength == '#ffffff':
 
                return value
 
            r, g, b = hex_to_rgb(value)
 
            # if isinstance(strength, Literal):
 
            #     strength = strength.toPython()
 
            # if isinstance(strength, str):
 
            #     sr, sg, sb = [v / 255 for v in hex_to_rgb(strength)]
 
            if True:
 
                sr = sg = sb = strength
 
            return rgb_to_hex((int(r * sr), int(g * sg), int(b * sb)))
 
    elif isinstance(value, (int, float)):
 
        return value * strength
 

	
 
    raise NotImplementedError("%r,%r" % (value, strength))
src/light9/effect/sequencer/eval_faders.py
Show inline comments
 
import traceback
 
import logging
 
import time
 
import traceback
 
from dataclasses import dataclass
 
from typing import List, Optional, cast
 

	
 
from prometheus_client import Summary
 
from rdfdb import SyncedGraph
 
from rdflib import URIRef
 
from rdflib.term import Node
 

	
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.effecteval2 import EffectEval2
 
from light9.effect.settings import DeviceSettings, EffectSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import EffectAttr, EffectUri, UnixTime
 
from light9.typedgraph import typedValue
 

	
 
log = logging.getLogger('seq.fader')
 

	
 
COMPILE = Summary('compile_graph_fader', 'compile')
 
COMPUTE_ALL_FADERS = Summary('compute_all_faders', 'compile')
 

	
 

	
 
@dataclass
 
class Fader:
 
    graph: SyncedGraph
src/light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline (and an untimed note from the faders)
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from light9 import networking
 
from light9.background_loop import loop_forever
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.run_local import log
 

	
 
RATE = 20
 

	
 

	
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
src/light9/effect/settings.py
Show inline comments
 
"""
 
Data structure and convertors for a table of (device,attr,value)
 
rows. These might be effect attrs ('strength'), device attrs ('rx'),
 
or output attrs (dmx channel).
 

	
 
BareSettings means (attr,value), no device.
 
"""
 
from __future__ import annotations
 

	
 
import decimal
 
import logging
 
from dataclasses import dataclass
 
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple, cast
 

	
 
import numpy
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import Literal, URIRef
 

	
 
from light9.collector.device import resolve
 
from light9.localsyncedgraph import LocalSyncedGraph
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion)
 
from light9.newtypes import DeviceAttr, DeviceUri, EffectAttr, HexColor, VTUnion
 

	
 
log = logging.getLogger('settings')
 

	
 

	
 
def parseHex(h):
 
    if h[0] != '#':
 
        raise ValueError(h)
 
    return [int(h[i:i + 2], 16) for i in (1, 3, 5)]
 

	
 

	
 
def parseHexNorm(h):
 
    return [x / 255 for x in parseHex(h)]
 

	
 

	
 
def toHex(rgbFloat: Sequence[float]) -> HexColor:
 
    assert len(rgbFloat) == 3
 
    scaled = (max(0, min(255, int(v * 255))) for v in rgbFloat)
 
    return HexColor('#%02x%02x%02x' % tuple(scaled))
 

	
 

	
 
def getVal(graph, subj):
 
    lit = graph.value(subj, L9['value']) or graph.value(subj, L9['scaledValue'])
 
    ret = lit.toPython()
 
    if isinstance(ret, decimal.Decimal):
0 comments (0 inline, 0 general)