Changeset - f79fff92990b
[Not reviewed]
default
0 4 0
drewp@bigasterisk.com - 20 months ago 2023-05-21 02:20:52
drewp@bigasterisk.com
collector.output use asyncio loop, not twisted loop. other cleanups.
4 files changed with 37 insertions and 21 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
@@ -3,10 +3,12 @@ import time
 
from typing import Dict, List, Set, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)
 
@@ -14,6 +16,13 @@ from light9.newtypes import (ClientSessi
 
log = logging.getLogger('collector')
 

	
 

	
 
def uriTail(u: URIRef) -> str:
 
    tail = u.rstrip('/').rsplit('/', 1)[1]
 
    if not tail:
 
        tail = str(u)
 
    return tail
 

	
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 
@@ -29,7 +38,7 @@ def _outputMap(graph: SyncedGraph, outpu
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            log.info('  💡 mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
@@ -38,17 +47,17 @@ def _outputMap(graph: SyncedGraph, outpu
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
 
            for row in graph.objects(dc, L9['attr']):
 
            for row in sorted(graph.objects(dc, L9['attr']), key=str):
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
 
                log.info(f'      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
 
    return ret
 

	
 

	
 
class Collector:
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""
 

	
 
    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
 
        self.graph = graph
 
@@ -142,7 +151,7 @@ class Collector:
 

	
 
        return deviceAttrs
 

	
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
light9/collector/output.py
Show inline comments
 
import asyncio
 
from typing import cast
 
from rdflib import URIRef
 
import socket
 
@@ -81,24 +82,18 @@ class BackgroundLoopOutput(Output):
 
        self.rate = rate
 
        self._currentBuffer = b''
 

	
 
        self._loop()
 
        self._task = asyncio.create_task(self._loop())
 

	
 
    def _loop(self):
 
    async def _loop(self):
 
        while True:
 
            self._loop_one()
 
            await asyncio.sleep(.1)
 

	
 
    def _loop_one(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            metrics('write_success', output=self.shortId()).incr() # type: ignore
 
            delay = max(0, start + 1 / self.rate - time.time())
 
            cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore
 

	
 
        def err(e):
 
            metrics('write_fail', output=self.shortId()).incr() # type: ignore
 
            log.error(e)
 
            cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 
        #tenacity retry
 
        self._write(sendingBuffer)
 

	
 

	
 
class FtdiDmx(BackgroundLoopOutput):
light9/collector/service.py
Show inline comments
 
@@ -65,6 +65,7 @@ async def PutAttrs(collector: Collector,
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
@@ -76,6 +77,7 @@ def main():
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100),
 
        ]
 
    except Exception:
light9/effect/sequencer/service.py
Show inline comments
 
@@ -7,6 +7,7 @@ import json
 
import logging
 
import time
 

	
 
import aiohttp
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
@@ -55,8 +56,17 @@ async def _send_one(faders: FaderEval):
 

	
 

	
 
async def _forever(faders):
 
    prevFail = True
 
    while True:
 
        await _send_one(faders)
 
        try:
 
            await _send_one(faders)
 
            if prevFail:
 
                log.info('connected')
 
            prevFail = False
 
        except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e:
 
            log.warn(f'{e!r} - retrying')
 
            prevFail = True
 
            await asyncio.sleep(2)
 
        await asyncio.sleep(0.1)
 

	
 

	
0 comments (0 inline, 0 general)