Changeset - bea44be4df38
[Not reviewed]
default
0 2 0
drewp@bigasterisk.com - 20 months ago 2023-06-01 21:17:33
drewp@bigasterisk.com
log levels
2 files changed with 3 insertions and 1 deletions:
0 comments (0 inline, 0 general)
light9/collector/service.py
Show inline comments
 
@@ -61,96 +61,98 @@ class Updates(WebSocketEndpoint, UiListe
 
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
 
        self.listeners.delClient(self)
 

	
 
    pass
 

	
 

	
 
async def PutAttrs(collector: Collector, request):
 
    with STAT_SETATTR.time():
 
        try:
 
            body = await request.body()
 
        except ClientDisconnect:
 
            log.warning("PUT /attrs request disconnected- ignoring")
 
            return Response('', status_code=400)
 
        client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
 
        collector.setAttrs(client, clientSession, settings, sendTime)
 
        return Response('', status_code=202)
 

	
 

	
 
async def zmqListener(collector):
 
    try:
 
        ctx = zmq.asyncio.Context()
 
        sock = ctx.socket(zmq.SUB)
 
        sock.bind('tcp://127.0.0.1:9203')
 
        sock.subscribe(b'setAttr')
 
        while True:
 
            [topic, msg] = await sock.recv_multipart()
 
            if topic != b'setAttr':
 
                raise ValueError(topic)
 
            # log.info(f'zmq recv {len(msg)}')
 
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    except:
 
        traceback.print_exc()
 
        raise
 

	
 
def findDevice():
 
    for line in subprocess.check_output("lsusb").decode('utf8').splitlines():
 
        if '16c0:05dc' in line:
 
            words = line.split(':')[0].split()
 
            dev = f'/dev/bus/usb/{words[1]}/{words[3]}'
 
            log.info(f'device will be {dev}')
 
            return dev ,int(words[3])
 
    raise ValueError("no matching uDMX found")
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 
    logging.getLogger().setLevel(logging.DEBUG)
 
    logging.getLogger('collector').setLevel(logging.DEBUG)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    devPath, usbAddress = findDevice()
 
            # if user doesn't have r/w, fail now
 
    try:
 
        # todo: drive outputs with config files
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=1, address=usbAddress, lastDmxChannel=200, rate=RATE),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 
    zl = asyncio.create_task(zmqListener(c))
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
 
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
 
        ],
 
    )
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    # loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
 
    # if loadtest:
 
    #     # in a subprocess since we don't want this client to be
 
    #     # cooperating with the main event loop and only sending
 
    #     # requests when there's free time
 
    #     def afterWarmup():
 
    #         log.info('running collector_loadtest')
 
    #         d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
 

	
 
    #         def done(*a):
 
    #             log.info('loadtest done')
 
    #             reactor.stop()
 

	
 
    #         d.addCallback(done)
 

	
light9/effect/sequencer/service.py
Show inline comments
 
@@ -6,77 +6,77 @@ import asyncio
 
import json
 
import logging
 
import time
 

	
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from lib.background_loop import loop_forever
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.run_local import log
 

	
 
RATE = 20
 

	
 

	
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
 
            lastSend = now
 
            yield json.dumps(state)
 

	
 

	
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
def main():
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.DEBUG)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 
    logging.getLogger('effecteval').setLevel(logging.INFO)
 

	
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    lib = EffectFunctionLibrary(graph)
 
    faders = FaderEval(graph, lib)  # bin/fade's untimed effects
 

	
 
    #@metrics('computeAndSend').time() # needs rework with async
 
    async def update(first_run):
 
        ds = faders.computeOutput()
 
        await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 
    faders_loop = loop_forever(func=update, metric_prefix='faders', sleep_period=1 / RATE)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            Route('/updates', endpoint=send_page_updates),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
0 comments (0 inline, 0 general)