Changeset - f79fff92990b
[Not reviewed]
default
0 4 0
drewp@bigasterisk.com - 20 months ago 2023-05-21 02:20:52
drewp@bigasterisk.com
collector.output use asyncio loop, not twisted loop. other cleanups.
4 files changed with 36 insertions and 20 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
import logging
 
import time
 
from typing import Dict, List, Set, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)
 

	
 
log = logging.getLogger('collector')
 

	
 

	
 
def uriTail(u: URIRef) -> str:
 
    tail = u.rstrip('/').rsplit('/', 1)[1]
 
    if not tail:
 
        tail = str(u)
 
    return tail
 

	
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            log.info('  💡 mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
 
            for row in graph.objects(dc, L9['attr']):
 
            for row in sorted(graph.objects(dc, L9['attr']), key=str):
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
 
                log.info(f'      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
 
    return ret
 

	
 

	
 
class Collector:
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""
 

	
 
    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices: Set[DeviceUri] = set()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 

	
 
        self.graph.addHandler(self._rebuildOutputMap)
 

	
 
@@ -133,25 +142,25 @@ class Collector:
 
                # not going to 0.
 
                if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
 
                    self.stickyAttrs[(device, deviceAttr)] = cast(float, value)
 

	
 
        # e.g. don't let an unspecified rotation go to 0
 
        for (d, da), v in self.stickyAttrs.items():
 
            daDict = deviceAttrs.setdefault(d, {})
 
            if da not in daDict:
 
                daDict[da] = v
 

	
 
        return deviceAttrs
 

	
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
 
        output attrs, and call Output.update to send the new outputs.
 

	
 
        client is a string naming the type of client. (client,
 
        clientSession) is a unique client instance.
 

	
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
light9/collector/output.py
Show inline comments
 
import asyncio
 
from typing import cast
 
from rdflib import URIRef
 
import socket
 
import struct
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor, task
 
from twisted.internet.interfaces import IReactorCore, IReactorTime
 
from light9.metrics import metrics
 

	
 
log = logging.getLogger('output')
 
@@ -72,42 +73,36 @@ class DummyOutput(Output):
 

	
 

	
 
class BackgroundLoopOutput(Output):
 
    """Call _write forever at 20hz in background threads"""
 

	
 
    rate: float
 

	
 
    def __init__(self, uri, rate=22):
 
        super().__init__(uri)
 
        self.rate = rate
 
        self._currentBuffer = b''
 

	
 
        self._loop()
 
        self._task = asyncio.create_task(self._loop())
 

	
 
    def _loop(self):
 
    async def _loop(self):
 
        while True:
 
            self._loop_one()
 
            await asyncio.sleep(.1)
 

	
 
    def _loop_one(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            metrics('write_success', output=self.shortId()).incr() # type: ignore
 
            delay = max(0, start + 1 / self.rate - time.time())
 
            cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore
 

	
 
        def err(e):
 
            metrics('write_fail', output=self.shortId()).incr() # type: ignore
 
            log.error(e)
 
            cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 
        #tenacity retry
 
        self._write(sendingBuffer)
 

	
 

	
 
class FtdiDmx(BackgroundLoopOutput):
 

	
 
    def __init__(self, uri, lastDmxChannel, rate=22):
 
        super().__init__(uri)
 
        self.lastDmxChannel = lastDmxChannel
 
        from .dmx_controller_output import OpenDmxUsb
 
        self.dmx = OpenDmxUsb()
 

	
 
    def _write(self, buf):
 
        with metrics('write', output=self.shortId()).time():
light9/collector/service.py
Show inline comments
 
@@ -56,35 +56,37 @@ class Updates(WebSocketEndpoint, UiListe
 

	
 

	
 
async def PutAttrs(collector: Collector, request):
 
    with STAT_SETATTR.time():
 
        client, clientSession, settings, sendTime = parseJsonMessage(await request.body())
 
        collector.setAttrs(client, clientSession, settings, sendTime)
 
        return Response('', status_code=202)
 

	
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    try:
 
        # todo: drive outputs with config files
 
        rate = 30
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline (and an untimed note from the faders)
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
import aiohttp
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.effect.settings import DeviceSettings
 
@@ -46,26 +47,35 @@ async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
###################################################################
 

	
 

	
 
async def _send_one(faders: FaderEval):
 
    ds = faders.computeOutput()
 
    await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 

	
 
async def _forever(faders):
 
    prevFail = True
 
    while True:
 
        try:
 
        await _send_one(faders)
 
            if prevFail:
 
                log.info('connected')
 
            prevFail = False
 
        except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e:
 
            log.warn(f'{e!r} - retrying')
 
            prevFail = True
 
            await asyncio.sleep(2)
 
        await asyncio.sleep(0.1)
 

	
 

	
 
def send_updates_forever(faders):
 
    asyncio.create_task(_forever(faders))
 

	
 

	
 
####################################################################
 

	
 

	
 
def main():
 
    session = 'effectSequencer'
0 comments (0 inline, 0 general)