Changeset - a0abf52b06d6
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 20 months ago 2023-06-01 21:16:45
drewp@bigasterisk.com
collector: auto find dmx address (at startup) if it reconnects usb with a new one
1 file changed with 13 insertions and 2 deletions:
0 comments (0 inline, 0 general)
light9/collector/service.py
Show inline comments
 
@@ -6,12 +6,13 @@ custom code for some attributes.
 

	
 
Input can be over http or zmq.
 
"""
 
import asyncio
 
import functools
 
import logging
 
import subprocess
 
import traceback
 
from typing import List
 

	
 
from light9 import networking
 
from light9.collector.collector import Collector
 
from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx  # noqa
 
@@ -32,12 +33,13 @@ from starlette_exporter import Prometheu
 

	
 
import zmq
 
import zmq.asyncio
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 
# this is the rate sent to usb
 
RATE = 20
 

	
 

	
 
class Updates(WebSocketEndpoint, UiListener):
 

	
 
    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
 
@@ -88,30 +90,39 @@ async def zmqListener(collector):
 
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    except:
 
        traceback.print_exc()
 
        raise
 

	
 
def findDevice():
 
    for line in subprocess.check_output("lsusb").decode('utf8').splitlines():
 
        if '16c0:05dc' in line:
 
            words = line.split(':')[0].split()
 
            dev = f'/dev/bus/usb/{words[1]}/{words[3]}'
 
            log.info(f'device will be {dev}')
 
            return dev ,int(words[3])
 
    raise ValueError("no matching uDMX found")
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    devPath, usbAddress = findDevice()
 
            # if user doesn't have r/w, fail now
 
    try:
 
        # todo: drive outputs with config files
 
        rate = 30
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=200, rate=RATE),
 
            Udmx(L9['output/dmxA/'], bus=1, address=usbAddress, lastDmxChannel=200, rate=RATE),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
0 comments (0 inline, 0 general)