Files @ 4b8f8fabeb2f
Branch filter:

Location: light9/src/light9/ascoltami/webapp.py

drewp@bigasterisk.com
vscode settings
import asyncio
import json
import logging
import socket
import subprocess
import time
from typing import cast

from rdflib import RDFS, Graph, URIRef
from light9.ascoltami.player import Player
from sse_starlette.sse import EventSourceResponse
from starlette.requests import Request
from starlette.responses import JSONResponse, PlainTextResponse

from light9.namespaces import L9
from light9.showconfig import getSongsFromShow, showUri, songOnDisk

log = logging.getLogger()
_songUris = {}  # locationUri : song


def songLocation(graph, songUri):
    loc = URIRef("file://%s" % songOnDisk(songUri))
    _songUris[loc] = songUri
    return loc


def songUri(graph, locationUri):
    return _songUris[locationUri]


async def get_config(request: Request) -> JSONResponse:
    return JSONResponse(
        dict(
            host=socket.gethostname(),
            show=str(showUri()),
            times={
                # these are just for the web display. True values are on Player.__init__
                'intro': 4,
                'post': 0
            }))


def playerSongUri(graph, player):
    """or None"""

    playingLocation = player.getSong()
    if playingLocation:
        return songUri(graph, URIRef(playingLocation))
    else:
        return None


def currentState(graph, player):
    if player.isAutostopped():
        nextAction = 'finish'
    elif player.isPlaying():
        nextAction = 'disabled'
    else:
        nextAction = 'play'

    return {
        "song": playerSongUri(graph, player),
        "started": player.playStartTime,
        "duration": player.duration(),
        "playing": player.isPlaying(),
        "t": player.currentTime(),
        "state": player.states(),
        "next": nextAction,
    }


async def get_time(request: Request) -> JSONResponse:
    player = cast(Player, request.app.state.player)
    graph = cast(Graph, request.app.state.graph)
    return JSONResponse(currentState(graph, player))


async def post_time(request: Request) -> PlainTextResponse:
    """
    post a json object with {pause: true} or {resume: true} if you
    want those actions. Use {t: <seconds>} to seek, optionally
    with a pause/resume command too.
    """
    params = await request.json()
    player = cast(Player, request.app.state.player)
    if params.get('pause', False):
        player.pause()
    if params.get('resume', False):
        player.resume()
    if 't' in params:
        player.seek(params['t'])
    return PlainTextResponse("ok")


async def timeStream(request: Request):
    graph = cast(Graph, request.app.state.graph)
    player = cast(Player, request.app.state.player)
    async def event_generator():
        last_sent = None
        last_sent_time = 0.0

        while True:
            now = time.time()
            msg = currentState(graph, player)
            if msg != last_sent or now > last_sent_time + 2:
                event_data = json.dumps(msg)
                yield event_data
                last_sent = msg
                last_sent_time = now

            await asyncio.sleep(0.1)

    return EventSourceResponse(event_generator())


async def get_songs(request: Request) -> JSONResponse:
    graph = cast(Graph, request.app.state.graph)

    songs = getSongsFromShow(graph, request.app.state.show)

    songs_data = [
        {  #
            "uri": s,
            "path": graph.value(s, L9['songFilename']),
            "label": graph.value(s, RDFS.label)
        } for s in songs
    ]

    return JSONResponse({"songs": songs_data})


async def post_song(request: Request) -> PlainTextResponse:
    """post a uri of song to switch to (and start playing)"""
    graph = cast(Graph, request.app.state.graph)
    player = cast(Player, request.app.state.player)

    song_uri = URIRef((await request.body()).decode('utf8'))
    player.setSong(songLocation(graph, song_uri))

    return PlainTextResponse("ok")


async def post_seekPlayOrPause(request: Request) -> PlainTextResponse:
    """curveCalc's ctrl-p or a vidref scrub"""
    player = cast(Player, request.app.state.player)

    data = await request.json()
    if 'scrub' in data:
        player.pause()
        player.seek(data['scrub'])
        return PlainTextResponse("ok")
    if 'action' in data:
        if data['action'] == 'play':
            player.resume()
        elif data['action'] == 'pause':
            player.pause()
        else:
            raise NotImplementedError
        return PlainTextResponse("ok")
    if player.isPlaying():
        player.pause()
    else:
        player.seek(data['t'])
        player.resume()

    return PlainTextResponse("ok")


async def post_output(request: Request) -> PlainTextResponse:
    d = await request.json()
    subprocess.check_call(["bin/movesinks", str(d['sink'])])
    return PlainTextResponse("ok")


async def post_goButton(request: Request) -> PlainTextResponse:
    """
    if music is playing, this silently does nothing.
    """
    player = cast(Player, request.app.state.player)

    if player.isAutostopped():
        player.resume()
    elif player.isPlaying():
        pass
    else:
        player.resume()
    return PlainTextResponse("ok")