Changeset - 9b6f4b3c329c
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 20 months ago 2023-05-24 06:42:33
drewp@bigasterisk.com
correct some output loop rates; clean up
3 files changed with 13 insertions and 32 deletions:
0 comments (0 inline, 0 general)
light9/collector/output.py
Show inline comments
 
@@ -41,98 +41,100 @@ class Output:
 
        """short string to distinguish outputs"""
 
        return self.uri.rstrip('/').rsplit('/')[-1]
 

	
 
    def update(self, buf: bytes) -> None:
 
        """caller asks for the output to be this buffer"""
 
        self._currentBuffer = buf
 

	
 
    def _periodicLog(self):
 
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str, self._currentBuffer)))
 
        if msg != self._lastLoggedMsg:
 
            log.debug(msg)
 
            self._lastLoggedMsg = msg
 

	
 
    def _write(self, buf: bytes) -> None:
 
        """
 
        write buffer to output hardware (may be throttled if updates are
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
 

	
 
    def crash(self):
 
        log.error('unrecoverable- exiting')
 
        cast(IReactorCore, reactor).crash()
 

	
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
 

	
 
    def update(self, buf: bytes):
 
        log.info(f'dummy update {list(map(int,buf[:80]))}')
 

	
 

	
 
class BackgroundLoopOutput(Output):
 
    """Call _write forever at 20hz in background threads"""
 

	
 
    rate: float
 

	
 
    def __init__(self, uri, rate=22):
 
        super().__init__(uri)
 
        self.rate = rate
 
        self._currentBuffer = b''
 

	
 
        self._task = asyncio.create_task(self._loop())
 

	
 
    async def _loop(self):
 
        while True:
 
            t1 = time.time()
 
            self._loop_one()
 
            await asyncio.sleep(.1)
 
            remain = max(0, 1 / self.rate - (time.time() - t1))
 
            await asyncio.sleep(remain)
 

	
 
    def _loop_one(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 
        #tenacity retry
 
        self._write(sendingBuffer)
 

	
 

	
 
class FtdiDmx(BackgroundLoopOutput):
 

	
 
    def __init__(self, uri, lastDmxChannel, rate=22):
 
        super().__init__(uri)
 
        self.lastDmxChannel = lastDmxChannel
 
        from .dmx_controller_output import OpenDmxUsb
 
        self.dmx = OpenDmxUsb()
 

	
 
    def _write(self, buf):
 
        with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 

	
 
            # ok to truncate the last channels if they just went
 
            # to 0? No it is not. DMX receivers don't add implicit
 
            # zeros there.
 
            buf = bytes([0]) + buf[:self.lastDmxChannel]
 

	
 
            if logAllDmx.isEnabledFor(logging.DEBUG):
 
                # for testing fps, smooth fades, etc
 
                logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))
 

	
 
            self.dmx.send_dmx(buf)
 

	
 

	
 
class ArtnetDmx(BackgroundLoopOutput):
 
    # adapted from https://github.com/spacemanspiff2007/PyArtNet/blob/master/pyartnet/artnet_node.py (gpl3)
 
    def __init__(self, uri, host, port, rate):
 
        """sends UDP messages to the given host/port"""
 
        super().__init__(uri, rate)
 
        packet = bytearray()
 
        packet.extend(map(ord, "Art-Net"))
 
        packet.append(0x00)  # Null terminate Art-Net
 
        packet.extend([0x00, 0x50])  # Opcode ArtDMX 0x5000 (Little endian)
 
        packet.extend([0x00, 0x0e])  # Protocol version 14
 
        self.base_packet = packet
 
        self.sequence_counter = 255
 
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 

	
light9/effect/sequencer/eval_faders.py
Show inline comments
 
import asyncio
 
from dataclasses import dataclass
 
import logging
 
import time
 
from typing import Callable, Coroutine, List, Optional, cast
 
from light9.collector.collector import uriTail
 
from light9.typedgraph import typedValue
 

	
 
from rdfdb import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.effect import effecteval
 
from light9.effect.sequencer import Note
 
from light9.effect.settings import DeviceSettings, EffectSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.metrics import metrics
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import EffectAttr, EffectUri, NoteUri, UnixTime
 
from rdflib.term import Node
 

	
 
log = logging.getLogger('seq.fader')
 

	
 
@dataclass
 
class Fader:
 
    graph: SyncedGraph
 
    uri: URIRef
 
    effect: EffectUri
 
    setEffectAttr: EffectAttr
 

	
 
    value: Optional[float]=None # mutable
 

	
 
    def __post_init__(self):
 
        self.ee = effecteval.EffectEval2(self.graph, self.effect)
 

	
 
class FaderEval:
 
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
 

	
 
    """
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 ):
 
        self.graph = graph
 
        self.sendToCollector = sendToCollector
 

	
 
        self.faders: List[Fader] = []
 

	
 
        # self.simpleOutputs = SimpleOutputs(self.graph)
 
        log.info('fader adds handler')
 
        self.graph.addHandler(self._compile)
 
        self.lastLoopSucceeded = False
 

	
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        # have caller do this
 
        #asyncio.create_task(self.startUpdating())
 

	
 
    # async def startUpdating(self):
 
    #     await self.graph.addAsyncHandler(self.update)
 
    #     log.info('startupdating task done')
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self._compile)
 
        #self.updateLoop()
 

	
 
    @metrics('compile_graph_fader').time()
 
    def _compile(self) -> None:
 
        """rebuild our data from the graph"""
 
        self.faders = []
 
        for fader in self.graph.subjects(RDF.type, L9['Fader']):
 
            effect = typedValue(EffectUri, self.graph, fader, L9['effect'])
 
            setting = typedValue(Node, self.graph, fader, L9['setting'])
 
            setAttr = typedValue(EffectAttr,  self.graph, setting, L9['effectAttr'])
 
            self.faders.append(Fader(self.graph, cast(URIRef, fader), effect, setAttr))           
 

	
 
        # this could go in a second, smaller addHandler call to avoid rebuilding Fader objs constantly
 
        for f in self.faders:
 
            f.value = typedValue(float, self.graph, f.uri, L9['value'])
 

	
 
    def computeOutput(self) -> DeviceSettings:
 
        notesSettings:List[DeviceSettings] = []
 
        now = UnixTime(time.time())
 
        for f in self.faders:
 
            if f.value is None:
 
                raise TypeError('f.value should be set by now')
 
            effectSettings = EffectSettings(self.graph, [(f.effect, f.setEffectAttr, f.value)])
 

	
 
            print(f'{effectSettings=}')
 
            notesSettings.append(f.ee.compute(effectSettings))
 

	
 
            # ee = effecteval.EffectEval(self.graph, f.effectClass, self.simpleOutputs)
 
            # deviceSettings, report = ee.outputFromEffect(
 
            #     effectSettings,
 
            #     songTime=now, # probably wrong
 
            #     noteTime=now, # wrong
 
            #     )
 
            # log.info(f'  𝅘𝅥𝅮  {uriTail(f.uri)}: {effectSettings=} -> {deviceSettings=}')
 
            # if deviceSettings:
 
            #     notesSettings.append(deviceSettings)
 
        return DeviceSettings.merge(self.graph, notesSettings)
 

	
 

	
 
    # @metrics('update_call_fader').time()
 
    # async def update(self):
 
    #     log.info(f'update {len(self.notes)=}')
 
    #     devSettings = self.computeOutput()
 
    #     with metrics('update_s3_send_fader').time():  # our measurement
 
    #         sendSecs = await self.sendToCollector(devSettings)
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline (and an untimed note from the faders)
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
import aiohttp
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from lib.background_loop import loop_forever
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from light9.run_local import log
 

	
 
from lib.background_loop import loop_forever
 
RATE = 30
 

	
 

	
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
 
            lastSend = now
 
            yield json.dumps(state)
 

	
 

	
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
###################################################################
 

	
 

	
 
async def _send_one(faders: FaderEval):
 
    ds = faders.computeOutput()
 
    await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 

	
 

	
 
####################################################################
 

	
 

	
 
def main():
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 

	
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    faders = FaderEval(graph)  # bin/fade's untimed notes
 
    faders = FaderEval(graph)  # bin/fade's untimed effects
 

	
 
    async def so(first_run):
 
        await _send_one(faders)
 
    faders_loop = loop_forever(so, metric_prefix='faders', sleep_period=.05, log_fps=True)
 
    #@metrics('computeAndSend').time() # needs rework with async
 
    async def update(first_run):
 
        ds = faders.computeOutput()
 
        await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 
    faders_loop = loop_forever(update, metric_prefix='faders', sleep_period=1 / RATE)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            Route('/updates', endpoint=send_page_updates),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
0 comments (0 inline, 0 general)