Changeset - bea44be4df38
[Not reviewed]
default
0 2 0
drewp@bigasterisk.com - 20 months ago 2023-06-01 21:17:33
drewp@bigasterisk.com
log levels
2 files changed with 3 insertions and 1 deletions:
0 comments (0 inline, 0 general)
light9/collector/service.py
Show inline comments
 
@@ -13,150 +13,152 @@ import subprocess
 
import traceback
 
from typing import List
 

	
 
from light9 import networking
 
from light9.collector.collector import Collector
 
from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx  # noqa
 
from light9.collector.weblisteners import UiListener, WebListeners
 
from light9.namespaces import L9
 
from light9.run_local import log
 
from light9.zmqtransport import parseJsonMessage
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from starlette.applications import Starlette
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.requests import ClientDisconnect
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
import zmq
 
import zmq.asyncio
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 
# this is the rate sent to usb
 
RATE = 20
 

	
 

	
 
class Updates(WebSocketEndpoint, UiListener):
 

	
 
    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
 
        super().__init__(scope, receive, send)
 
        self.listeners = listeners
 

	
 
    async def on_connect(self, websocket: WebSocket):
 
        await websocket.accept()
 
        log.info('socket connect %s', self.scope['client'])
 
        self.websocket = websocket
 
        self.listeners.addClient(self)
 

	
 
    async def sendMessage(self, msgText):
 
        await self.websocket.send_text(msgText)
 

	
 
    # async def on_receive(self, websocket, data):
 
    #     json.loads(data)
 

	
 
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
 
        self.listeners.delClient(self)
 

	
 
    pass
 

	
 

	
 
async def PutAttrs(collector: Collector, request):
 
    with STAT_SETATTR.time():
 
        try:
 
            body = await request.body()
 
        except ClientDisconnect:
 
            log.warning("PUT /attrs request disconnected- ignoring")
 
            return Response('', status_code=400)
 
        client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
 
        collector.setAttrs(client, clientSession, settings, sendTime)
 
        return Response('', status_code=202)
 

	
 

	
 
async def zmqListener(collector):
 
    try:
 
        ctx = zmq.asyncio.Context()
 
        sock = ctx.socket(zmq.SUB)
 
        sock.bind('tcp://127.0.0.1:9203')
 
        sock.subscribe(b'setAttr')
 
        while True:
 
            [topic, msg] = await sock.recv_multipart()
 
            if topic != b'setAttr':
 
                raise ValueError(topic)
 
            # log.info(f'zmq recv {len(msg)}')
 
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    except:
 
        traceback.print_exc()
 
        raise
 

	
 
def findDevice():
 
    for line in subprocess.check_output("lsusb").decode('utf8').splitlines():
 
        if '16c0:05dc' in line:
 
            words = line.split(':')[0].split()
 
            dev = f'/dev/bus/usb/{words[1]}/{words[3]}'
 
            log.info(f'device will be {dev}')
 
            return dev ,int(words[3])
 
    raise ValueError("no matching uDMX found")
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 
    logging.getLogger().setLevel(logging.DEBUG)
 
    logging.getLogger('collector').setLevel(logging.DEBUG)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    devPath, usbAddress = findDevice()
 
            # if user doesn't have r/w, fail now
 
    try:
 
        # todo: drive outputs with config files
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=1, address=usbAddress, lastDmxChannel=200, rate=RATE),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 
    zl = asyncio.create_task(zmqListener(c))
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
 
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
 
        ],
 
    )
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    # loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
 
    # if loadtest:
 
    #     # in a subprocess since we don't want this client to be
 
    #     # cooperating with the main event loop and only sending
 
    #     # requests when there's free time
 
    #     def afterWarmup():
 
    #         log.info('running collector_loadtest')
 
    #         d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
 

	
 
    #         def done(*a):
 
    #             log.info('loadtest done')
 
    #             reactor.stop()
 

	
 
    #         d.addCallback(done)
 

	
 
    #     reactor.callLater(2, afterWarmup)
 

	
 
    return app
 

	
 

	
 
app = main()
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline (and an untimed note from the faders)
 
"""
 

	
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from lib.background_loop import loop_forever
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.run_local import log
 

	
 
RATE = 20
 

	
 

	
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
 
            lastSend = now
 
            yield json.dumps(state)
 

	
 

	
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
def main():
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.DEBUG)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 
    logging.getLogger('effecteval').setLevel(logging.INFO)
 

	
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    lib = EffectFunctionLibrary(graph)
 
    faders = FaderEval(graph, lib)  # bin/fade's untimed effects
 

	
 
    #@metrics('computeAndSend').time() # needs rework with async
 
    async def update(first_run):
 
        ds = faders.computeOutput()
 
        await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 
    faders_loop = loop_forever(func=update, metric_prefix='faders', sleep_period=1 / RATE)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            Route('/updates', endpoint=send_page_updates),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
0 comments (0 inline, 0 general)