Files @ e683b449506b
Branch filter:

Location: light9/src/light9/blender/light_control/send_to_collector.py

drewp@bigasterisk.com
blender effect that sets lights to match blender lights
import asyncio
import logging
from dataclasses import dataclass, field
from os import sync
from typing import cast

import bpy  # type: ignore
from light9.blender.time_sync.time_from_graph import clamp
from light9.effect.effecteval import literalColor
from rdfdb.patch import Patch
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import RDF, ConjunctiveGraph, Literal, URIRef
from rdflib.graph import _ContextType

from light9 import networking
from light9.blender.asyncio_thread import startLoopInThread
from light9.namespaces import FUNC, L9
from light9.newtypes import DeviceUri
from light9.showconfig import showUri

log = logging.getLogger('sendcoll')

UPDATE_PERIOD = 1 / 20


async def waitForConnection(graph: SyncedGraph):
    # workaround for patch() quietly failing before connection. See SyncedGraph.
    while not graph._isConnected():
        await asyncio.sleep(.5)

@dataclass
class BlenderDev:
    ctx: URIRef
    obj: bpy.types.Object
    uri: DeviceUri
    setting: URIRef
    devSets: URIRef

    color: tuple[float, float, float] = (0, 0, 0)

    def updateColor(self):
        self.color = self.obj.data.color

    def effectGraphStmts(self, bset0) -> list:
        return [
            (self.devSets, L9['setting'], self.setting, self.ctx),
            (self.setting, L9['device'], self.uri, self.ctx),
            (self.setting, L9['deviceAttr'], L9['color'], self.ctx),
        ]

    def makePatch(self, graph: SyncedGraph) -> Patch:
        c = literalColor(*[clamp(x, 0, 1) for x in self.color])
        return graph.getObjectPatch(self.ctx, self.setting, L9['value'], c)


@dataclass
class Sender:
    syncedObjects: dict[str, BlenderDev] = field(default_factory=dict)

    ctx = URIRef(showUri() + '/blender')
    devSets = L9['blenderControlDevSets']

    def __post_init__(self):
        bpy.app.timers.register(self.findLightsInScene, first_interval=0.1)  # todo: what event to use?
        startLoopInThread(self.task())

    async def task(self):
        log.info('start task')
        try:
            self.graph = SyncedGraph(networking.rdfdb.url, "blender_light_control")
            await waitForConnection(self.graph)
            self.patchInitialGraph()

            while True:
                try:
                    await self.patchCurrentLightColors()
                except Exception:
                    log.error('skip', exc_info=True)
                    await asyncio.sleep(1)
                # todo: this could be triggered by onColorChanges instead of running constantly
                await asyncio.sleep(UPDATE_PERIOD)
        except Exception:
            log.error('task', exc_info=True)

    def patchInitialGraph(self):
        g = self.effectGraph()
        for d in self.syncedObjects.values():
            for stmt in d.effectGraphStmts(self.devSets):
                g.add(stmt)
        for stmt in g:
            log.info("adding %s", stmt)
        self.graph.patchSubgraph(self.ctx, g)

    async def patchCurrentLightColors(self):
        p = Patch(addQuads=[], delQuads=[])
        for d in self.syncedObjects.values():
            p = p.update(d.makePatch(self.graph))
        if p:
            await self.graph.patch(p)

    def effectGraph(self) -> ConjunctiveGraph:
        g = ConjunctiveGraph()
        ctx = cast(_ContextType, self.ctx)
        g.add((L9['blenderControl'], RDF.type, L9['Effect'], ctx))
        g.add((L9['blenderControl'], L9['effectFunction'], FUNC['scale'], ctx))
        g.add((L9['blenderControl'], L9['publishAttr'], L9['strength'], ctx))
        setting = L9['blenderControl/set0']
        g.add((L9['blenderControl'], L9['setting'], setting, ctx))
        g.add((setting, L9['value'], self.devSets, ctx))
        g.add((setting, L9['effectAttr'], L9['deviceSettings'], ctx))
        return g

    def findLightsInScene(self):
        self.syncedObjects.clear()
        for obj in sorted(bpy.data.objects, key=lambda x: x.name):
            if obj.get('l9dev'):
                uri = DeviceUri(URIRef(obj['l9dev']))
                self.syncedObjects[obj.name] = BlenderDev(
                    self.ctx,
                    obj,
                    uri,
                    setting=L9['blenderControl/set1'],
                    devSets=self.devSets,
                )
                log.info(f'found {self.syncedObjects[obj.name]}')

        self.watchForUpdates()

    def watchForUpdates(self):
        bpy.app.handlers.depsgraph_update_post.append(self.onColorChanges)
        bpy.app.handlers.frame_change_post.append(self.onColorChanges)

    def onColorChanges(self, scene, deps):
        for obj in deps.objects:
            dev = self.syncedObjects.get(obj.name)
            if dev is None:
                continue
            dev.updateColor()