Changeset - f79fff92990b
[Not reviewed]
default
0 4 0
drewp@bigasterisk.com - 20 months ago 2023-05-21 02:20:52
drewp@bigasterisk.com
collector.output use asyncio loop, not twisted loop. other cleanups.
4 files changed with 36 insertions and 20 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
import logging
 
import time
 
from typing import Dict, List, Set, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)
 

	
 
log = logging.getLogger('collector')
 

	
 

	
 
def uriTail(u: URIRef) -> str:
 
    tail = u.rstrip('/').rsplit('/', 1)[1]
 
    if not tail:
 
        tail = str(u)
 
    return tail
 

	
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            log.info('  💡 mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
 
            for row in graph.objects(dc, L9['attr']):
 
            for row in sorted(graph.objects(dc, L9['attr']), key=str):
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
 
                log.info(f'      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
 
    return ret
 

	
 

	
 
class Collector:
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""
 

	
 
    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices: Set[DeviceUri] = set()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 

	
 
        self.graph.addHandler(self._rebuildOutputMap)
 

	
 
        # rename to activeSessons ?
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
    def _rebuildOutputMap(self):
 
        self.allDevices.clear()
 

	
 
        self._outputByUri.clear()
 
        for out in self.outputs:
 
            self._outputByUri[OutputUri(out.uri)] = out
 
@@ -121,49 +130,49 @@ class Collector:
 
        for _, lastSettings in lastRequests:
 
            for (device, deviceAttr), value in lastSettings.items():
 
                if (device, deviceAttr) in self.remapOut:
 
                    start, end = self.remapOut[(device, deviceAttr)]
 
                    value = start + float(value) * (end - start)
 

	
 
                attrs = deviceAttrs.setdefault(device, {})
 
                if deviceAttr in attrs:
 
                    value = resolve(device, deviceAttr, [attrs[deviceAttr], value])
 
                attrs[deviceAttr] = value
 
                # list should come from the graph. these are attrs
 
                # that should default to holding the last position,
 
                # not going to 0.
 
                if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
 
                    self.stickyAttrs[(device, deviceAttr)] = cast(float, value)
 

	
 
        # e.g. don't let an unspecified rotation go to 0
 
        for (d, da), v in self.stickyAttrs.items():
 
            daDict = deviceAttrs.setdefault(d, {})
 
            if da not in daDict:
 
                daDict[da] = v
 

	
 
        return deviceAttrs
 

	
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
 
        output attrs, and call Output.update to send the new outputs.
 

	
 
        client is a string naming the type of client. (client,
 
        clientSession) is a unique client instance.
 

	
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
 
        clientSession = ClientSessionType("no_longer_used")
 

	
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
light9/collector/output.py
Show inline comments
 
import asyncio
 
from typing import cast
 
from rdflib import URIRef
 
import socket
 
import struct
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor, task
 
from twisted.internet.interfaces import IReactorCore, IReactorTime
 
from light9.metrics import metrics
 

	
 
log = logging.getLogger('output')
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 

	
 
class Output:
 
    """
 
    send a binary buffer of values to some output device. Call update
 
    as often as you want- the result will be sent as soon as possible,
 
    and with repeats as needed to outlast hardware timeouts.
 

	
 
    This base class doesn't ever call _write. Subclasses below have
 
    strategies for that.
 
    """
 
@@ -60,66 +61,60 @@ class Output:
 
    def crash(self):
 
        log.error('unrecoverable- exiting')
 
        cast(IReactorCore, reactor).crash()
 

	
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
 

	
 
    def update(self, buf: bytes):
 
        log.info(f'dummy update {list(map(int,buf[:80]))}')
 

	
 

	
 
class BackgroundLoopOutput(Output):
 
    """Call _write forever at 20hz in background threads"""
 

	
 
    rate: float
 

	
 
    def __init__(self, uri, rate=22):
 
        super().__init__(uri)
 
        self.rate = rate
 
        self._currentBuffer = b''
 

	
 
        self._loop()
 
        self._task = asyncio.create_task(self._loop())
 

	
 
    def _loop(self):
 
    async def _loop(self):
 
        while True:
 
            self._loop_one()
 
            await asyncio.sleep(.1)
 

	
 
    def _loop_one(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            metrics('write_success', output=self.shortId()).incr() # type: ignore
 
            delay = max(0, start + 1 / self.rate - time.time())
 
            cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore
 

	
 
        def err(e):
 
            metrics('write_fail', output=self.shortId()).incr() # type: ignore
 
            log.error(e)
 
            cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 
        #tenacity retry
 
        self._write(sendingBuffer)
 

	
 

	
 
class FtdiDmx(BackgroundLoopOutput):
 

	
 
    def __init__(self, uri, lastDmxChannel, rate=22):
 
        super().__init__(uri)
 
        self.lastDmxChannel = lastDmxChannel
 
        from .dmx_controller_output import OpenDmxUsb
 
        self.dmx = OpenDmxUsb()
 

	
 
    def _write(self, buf):
 
        with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 

	
 
            # ok to truncate the last channels if they just went
 
            # to 0? No it is not. DMX receivers don't add implicit
 
            # zeros there.
 
            buf = bytes([0]) + buf[:self.lastDmxChannel]
 

	
 
            if logAllDmx.isEnabledFor(logging.DEBUG):
 
                # for testing fps, smooth fades, etc
 
                logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))
light9/collector/service.py
Show inline comments
 
@@ -44,59 +44,61 @@ class Updates(WebSocketEndpoint, UiListe
 
        self.listeners.addClient(self)
 

	
 
    async def sendMessage(self, msgText):
 
        await self.websocket.send_text(msgText)
 

	
 
    # async def on_receive(self, websocket, data):
 
    #     json.loads(data)
 

	
 
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
 
        self.listeners.delClient(self)
 

	
 
    pass
 

	
 

	
 
async def PutAttrs(collector: Collector, request):
 
    with STAT_SETATTR.time():
 
        client, clientSession, settings, sendTime = parseJsonMessage(await request.body())
 
        collector.setAttrs(client, clientSession, settings, sendTime)
 
        return Response('', status_code=202)
 

	
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    try:
 
        # todo: drive outputs with config files
 
        rate = 30
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
 
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    # loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
 
    # if loadtest:
 
    #     # in a subprocess since we don't want this client to be
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline (and an untimed note from the faders)
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
import aiohttp
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from light9.run_local import log
 

	
 

	
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
 
            lastSend = now
 
            yield json.dumps(state)
 

	
 

	
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
###################################################################
 

	
 

	
 
async def _send_one(faders: FaderEval):
 
    ds = faders.computeOutput()
 
    await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 

	
 
async def _forever(faders):
 
    prevFail = True
 
    while True:
 
        try:
 
        await _send_one(faders)
 
            if prevFail:
 
                log.info('connected')
 
            prevFail = False
 
        except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e:
 
            log.warn(f'{e!r} - retrying')
 
            prevFail = True
 
            await asyncio.sleep(2)
 
        await asyncio.sleep(0.1)
 

	
 

	
 
def send_updates_forever(faders):
 
    asyncio.create_task(_forever(faders))
 

	
 

	
 
####################################################################
 

	
 

	
 
def main():
 
    session = 'effectSequencer'
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 

	
 
    async def send(settings: DeviceSettings):
 
        await sendToCollector('effectSequencer', session, settings)
 

	
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    faders = FaderEval(graph, send)  # bin/fade's untimed notes
 
    # asyncio.create_task(faders.startUpdating())
 

	
0 comments (0 inline, 0 general)