Changeset - f79fff92990b
0 4 0 - 20 months ago 2023-05-21 02:20:52
collector.output use asyncio loop, not twisted loop. other cleanups.
4 files changed with 36 insertions and 20 deletions:
import logging
import time
from typing import Dict, List, Set, Tuple, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9.collector.device import resolve, toOutputAttrs
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners
from light9.effect.settings import DeviceSettings
from light9.namespaces import L9, RDF
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)

log = logging.getLogger('collector')


def uriTail(u: URIRef) -> str:
    tail = u.rstrip('/').rsplit('/', 1)[1]
    if not tail:
        tail = str(u)
    return tail


def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
    return DmxMessageIndex(base + offset - 1)


def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
    """From rdf config graph, compute a map of
       (device, outputattr) : (output, index)
    that explains which output index to set for any device update.
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})

    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
'mapping DeviceClass %s', dc)
        for dev in graph.subjects(RDF.type, dc):
            dev = cast(DeviceUri, dev)
  '  💡 mapping device %s', dev)
  '  💡 mapping device %s', dev)
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
            if universe not in outputs:
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
            except ValueError:
                raise ValueError('no :dmxBase for %s' % dev)

            for row in graph.objects(dc, L9['attr']):
            for row in sorted(graph.objects(dc, L9['attr']), key=str):
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
                index = makeDmxMessageIndex(dmxBase, offset)
                ret[(dev, outputAttr)] = (universe, index)
                '      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
      '      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
    return ret


class Collector:
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
    """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""

    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
        self.graph = graph
        self.outputs = outputs
        self.listeners = listeners
        self.clientTimeoutSec = clientTimeoutSec
        self.initTime = time.time()
        self.allDevices: Set[DeviceUri] = set()
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}


        # rename to activeSessons ?
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}

        # (dev, devAttr): value to use instead of 0
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}

    def _rebuildOutputMap(self):

        for out in self.outputs:
            self._outputByUri[OutputUri(out.uri)] = out
@@ -121,49 +130,49 @@ class Collector:
        for _, lastSettings in lastRequests:
            for (device, deviceAttr), value in lastSettings.items():
                if (device, deviceAttr) in self.remapOut:
                    start, end = self.remapOut[(device, deviceAttr)]
                    value = start + float(value) * (end - start)

                attrs = deviceAttrs.setdefault(device, {})
                if deviceAttr in attrs:
                    value = resolve(device, deviceAttr, [attrs[deviceAttr], value])
                attrs[deviceAttr] = value
                # list should come from the graph. these are attrs
                # that should default to holding the last position,
                # not going to 0.
                if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
                    self.stickyAttrs[(device, deviceAttr)] = cast(float, value)

        # e.g. don't let an unspecified rotation go to 0
        for (d, da), v in self.stickyAttrs.items():
            daDict = deviceAttrs.setdefault(d, {})
            if da not in daDict:
                daDict[da] = v

        return deviceAttrs

    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
        settings is a list of (device, attr, value). These attrs are
        device attrs. We resolve conflicting values, process them into
        output attrs, and call Output.update to send the new outputs.

        client is a string naming the type of client. (client,
        clientSession) is a unique client instance.

        Each client session's last settings will be forgotten after
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
        clientSession = ClientSessionType("no_longer_used")

        now = UnixTime(time.time())
        self._warnOnLateRequests(client, now, sendTime)


        self._acceptNewClientSessionSettings(client, clientSession, settings, now)

        deviceAttrs = self._merge(iter(self.lastRequest.values()))

        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
import asyncio
from typing import cast
from rdflib import URIRef
import socket
import struct
import time
import usb.core
import logging
from twisted.internet import threads, reactor, task
from twisted.internet.interfaces import IReactorCore, IReactorTime
from light9.metrics import metrics

log = logging.getLogger('output')
logAllDmx = logging.getLogger('output.allDmx')


class Output:
    send a binary buffer of values to some output device. Call update
    as often as you want- the result will be sent as soon as possible,
    and with repeats as needed to outlast hardware timeouts.

    This base class doesn't ever call _write. Subclasses below have
    strategies for that.
@@ -60,66 +61,60 @@ class Output:
    def crash(self):
        log.error('unrecoverable- exiting')
        cast(IReactorCore, reactor).crash()


class DummyOutput(Output):

    def __init__(self, uri, **kw):

    def update(self, buf: bytes):
'dummy update {list(map(int,buf[:80]))}')


class BackgroundLoopOutput(Output):
    """Call _write forever at 20hz in background threads"""

    rate: float

    def __init__(self, uri, rate=22):
        self.rate = rate
        self._currentBuffer = b''

        async def _loop(self):

    def _loop(self):
    async def _loop(self):
        while True:
            await asyncio.sleep(.1)

    def _loop_one(self):
        start = time.time()
        sendingBuffer = self._currentBuffer

        def done(worked):
            metrics('write_success', output=self.shortId()).incr() # type: ignore
            delay = max(0, start + 1 / self.rate - time.time())
            cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore

        def err(e):
            metrics('write_fail', output=self.shortId()).incr() # type: ignore
            cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore

        d = threads.deferToThread(self._write, sendingBuffer)
        d.addCallbacks(done, err)
        #tenacity retry


class FtdiDmx(BackgroundLoopOutput):

    def __init__(self, uri, lastDmxChannel, rate=22):
        self.lastDmxChannel = lastDmxChannel
        from .dmx_controller_output import OpenDmxUsb
        self.dmx = OpenDmxUsb()

    def _write(self, buf):
        with metrics('write', output=self.shortId()).time():
            if not buf:
                logAllDmx.debug('%s: empty buf- no output', self.shortId())

            # ok to truncate the last channels if they just went
            # to 0? No it is not. DMX receivers don't add implicit
            # zeros there.
            buf = bytes([0]) + buf[:self.lastDmxChannel]

            if logAllDmx.isEnabledFor(logging.DEBUG):
                # for testing fps, smooth fades, etc
                logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))
@@ -44,59 +44,61 @@ class Updates(WebSocketEndpoint, UiListe

    async def sendMessage(self, msgText):
        await self.websocket.send_text(msgText)

    # async def on_receive(self, websocket, data):
    #     json.loads(data)

    async def on_disconnect(self, websocket: WebSocket, close_code: int):



async def PutAttrs(collector: Collector, request):
    with STAT_SETATTR.time():
        client, clientSession, settings, sendTime = parseJsonMessage(await request.body())
        collector.setAttrs(client, clientSession, settings, sendTime)
        return Response('', status_code=202)


def main():

    graph = SyncedGraph(networking.rdfdb.url, "collector")

        # todo: drive outputs with config files
        rate = 30
        outputs: List[Output] = [
            # ArtnetDmx(L9['output/dmxA/'],
            #           host='',
            #           port=6445,
            #           rate=rate),
            #sudo chmod a+rw /dev/bus/usb/003/021
            Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100),
    except Exception:
        log.error("setting up outputs:")
    listeners = WebListeners()
    c = Collector(graph, outputs, listeners)

    app = Starlette(
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),

    app.add_route("/metrics", handle_metrics)

    # loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
    # if loadtest:
    #     # in a subprocess since we don't want this client to be
plays back effect notes from the timeline (and an untimed note from the faders)

import asyncio
import json
import logging
import time

import aiohttp
from louie import dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from light9 import networking
from light9.collector.collector_client_asyncio import sendToCollector
from light9.effect.sequencer.eval_faders import FaderEval
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
from light9.effect.settings import DeviceSettings
from light9.metrics import metrics
from light9.run_local import log


async def changes():
    state = {}
    q = asyncio.Queue()

    def onBroadcast(update):

    dispatcher.connect(onBroadcast, StateUpdate)

    lastSend = 0
    while True:
        await q.get()
        now = time.time()
        if now > lastSend + .2:
            lastSend = now
            yield json.dumps(state)


async def send_page_updates(request):
    return EventSourceResponse(changes())




async def _send_one(faders: FaderEval):
    ds = faders.computeOutput()
    await sendToCollector('effectSequencer', session='0', settings=ds)


async def _forever(faders):
    prevFail = True
    while True:
        await _send_one(faders)
            if prevFail:
            prevFail = False
        except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e:
            log.warn(f'{e!r} - retrying')
            prevFail = True
            await asyncio.sleep(2)
        await asyncio.sleep(0.1)


def send_updates_forever(faders):




def main():
    session = 'effectSequencer'
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")

    async def send(settings: DeviceSettings):
        await sendToCollector('effectSequencer', session, settings)

    # seq = Sequencer(graph, send)  # per-song timed notes
    faders = FaderEval(graph, send)  # bin/fade's untimed notes
    # asyncio.create_task(faders.startUpdating())

