Changeset - f6e40619cf27
[Not reviewed]
default
0 2 0
drewp@bigasterisk.com - 8 months ago 2024-05-23 23:42:17
drewp@bigasterisk.com
reformat
2 files changed with 14 insertions and 20 deletions:
0 comments (0 inline, 0 general)
src/light9/ascoltami/player.py
Show inline comments
 
#!/usr/bin/python
 
"""
 
alternate to the mpd music player, for ascoltami
 
"""
 

	
 
import time, logging, traceback
 
from gi.repository import Gst # type: ignore
 
import logging
 
import time
 
import traceback
 
from dataclasses import dataclass
 
from pathlib import Path
 

	
 
from gi.repository import Gst
 
from rdflib import URIRef
 
from twisted.internet import task
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 

	
 

	
 
class Player:
 

	
 
    def __init__(self, autoStopOffset=4, onEOS=None):
 
        """autoStopOffset is the number of seconds before the end of
 
        song before automatically stopping (which is really pausing).
 
        onEOS is an optional function to be called when we reach the
 
        end of a stream (for example, can be used to advance the song).
 
        It is called with one argument which is the URI of the song that
 
        just finished."""
 
        self.autoStopOffset = autoStopOffset
 
        self.playbin = self.pipeline = Gst.ElementFactory.make('playbin', None)
 

	
 
        self._playStartTime = 0
 
        self._lastWatchTime = 0
 
        self._autoStopTime = 0
 
        self._lastSetSongUri = None
 
        self._onEOS = onEOS
 

	
 
        task.LoopingCall(self.watchTime).start(.050)
 

	
 
        #bus = self.pipeline.get_bus()
 
        # not working- see notes in pollForMessages
 
        #self.watchForMessages(bus)
 

	
 
    def watchTime(self):
 
        try:
 
            self.pollForMessages()
 

	
 
            t = self.currentTime()
 
            log.debug("watch %s < %s < %s", self._lastWatchTime,
 
                      self._autoStopTime, t)
 
            log.debug("watch %s < %s < %s", self._lastWatchTime, self._autoStopTime, t)
 
            if self._lastWatchTime < self._autoStopTime < t:
 
                log.info("autostop")
 
                self.pause()
 

	
 
            self._lastWatchTime = t
 
        except Exception:
 
            traceback.print_exc()
 

	
 
    def watchForMessages(self, bus):
 
        """this would be nicer than pollForMessages but it's not working for
 
        me. It's like add_signal_watch isn't running."""
 
        bus.add_signal_watch()
 

	
 
        def onEos(*args):
 
            print("onEos", args)
 
            if self._onEOS is not None:
 
                self._onEOS(self.getSong())
 

	
 
        bus.connect('message::eos', onEos)
 

	
 
        def onStreamStatus(bus, message):
 
            print("streamstatus", bus, message)
 
            (statusType, _elem) = message.parse_stream_status()
 
            if statusType == Gst.StreamStatusType.ENTER:
 
                self.setupAutostop()
 

	
 
        bus.connect('message::stream-status', onStreamStatus)
 

	
 
    def pollForMessages(self):
 
        """bus.add_signal_watch seems to be having no effect, but this works"""
 
        bus = self.pipeline.get_bus()
 
        mt = Gst.MessageType
 
        msg = bus.poll(
 
            mt.EOS | mt.STREAM_STATUS | mt.ERROR,  # | mt.ANY,
 
            0)
 
        if msg is not None:
 
            log.debug("bus message: %r %r", msg.src, msg.type)
 
            # i'm trying to catch here a case where the pulseaudio
 
            # output has an error, since that's otherwise kind of
 
            # mysterious to diagnose. I don't think this is exactly
 
            # working.
 
            if msg.type == mt.ERROR:
 
                log.error(repr(msg.parse_error()))
 
            if msg.type == mt.EOS:
 
                if self._onEOS is not None:
 
                    self._onEOS(self.getSong())
 
            if msg.type == mt.STREAM_STATUS:
 
                (statusType, _elem) = msg.parse_stream_status()
 
                if statusType == Gst.StreamStatusType.ENTER:
 
                    self.setupAutostop()
 

	
 
    def seek(self, t):
 
        isSeekable = self.playbin.seek_simple(
 
            Gst.Format.TIME,
 
            Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE | Gst.SeekFlags.SKIP,
 
            t * Gst.SECOND)
 
        isSeekable = self.playbin.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE | Gst.SeekFlags.SKIP, t * Gst.SECOND)
 
        if not isSeekable:
 
            raise ValueError('seek_simple failed')
 
        self._playStartTime = time.time()
 

	
 
    def setSong(self, songLoc, play=True):
 
        """
 
        uri like file:///my/proj/light9/show/dance2010/music/07.wav
 
        """
 
        log.info("set song to %r" % songLoc)
 
        self.pipeline.set_state(Gst.State.READY)
 
        self.preload(songLoc)
 
        self.pipeline.set_property("uri", songLoc)
 
        self._lastSetSongUri = songLoc
 
        # todo: don't have any error report yet if the uri can't be read
 
        if play:
 
            self.pipeline.set_state(Gst.State.PLAYING)
 
            self._playStartTime = time.time()
 

	
 
    def getSong(self):
 
        """Returns the URI of the current song."""
 
        # even the 'uri' that I just set isn't readable yet
 
        return self.playbin.get_property("uri") or self._lastSetSongUri
 

	
 
    def preload(self, songPath):
 
        """
 
        to avoid disk seek stutters, which happened sometimes (in 2007) with the
 
        non-gst version of this program, we read the whole file to get
 
        more OS caching.
 

	
 
        i don't care that it's blocking.
 
        """
 
        log.info("preloading %s", songPath)
 
        assert songPath.startswith("file://"), songPath
 
        try:
 
            open(songPath[len("file://"):], 'rb').read()
 
        except IOError as e:
 
            log.error("couldn't preload %s, %r", songPath, e)
 
            raise
 

	
 
    @metrics('current_time').time()
 
    def currentTime(self):
 
        success, cur = self.playbin.query_position(Gst.Format.TIME)
 
        if not success:
 
            return 0
 
        return cur / Gst.SECOND
 

	
 
    def duration(self):
 
        success, dur = self.playbin.query_duration(Gst.Format.TIME)
 
        if not success:
 
            return 0
 
        return dur / Gst.SECOND
 

	
 
    def states(self):
 
        """json-friendly object describing the interesting states of
 
        the player nodes"""
 
        success, state, pending = self.playbin.get_state(timeout=0)
 
        return {
 
            "current": {
 
                "name": state.value_nick
 
            },
 
            "pending": {
 
                "name": state.value_nick
 
            }
 
        }
 
        return {"current": {"name": state.value_nick}, "pending": {"name": state.value_nick}}
 

	
 
    def pause(self):
 
        self.pipeline.set_state(Gst.State.PAUSED)
 

	
 
    def isAutostopped(self):
 
        """
 
        are we stopped at the autostop time?
 
        """
 
        if self.autoStopOffset < .01:
 
            return False
 
        pos = self.currentTime()
 
        autoStop = self.duration() - self.autoStopOffset
 
        return not self.isPlaying() and abs(
 
            pos - autoStop) < 1  # i've seen .4 difference here
 
        return not self.isPlaying() and abs(pos - autoStop) < 1  # i've seen .4 difference here
 

	
 
    def resume(self):
 
        self.pipeline.set_state(Gst.State.PLAYING)
 

	
 
    def setupAutostop(self):
 
        dur = self.duration()
 
        if dur == 0:
 
            raise ValueError("duration=0, can't set autostop")
 
        self._autoStopTime = (dur - self.autoStopOffset)
 
        log.info("autostop will be at %s", self._autoStopTime)
 
        # pipeline.seek can take a stop time, but using that wasn't
 
        # working out well. I'd get pauses at other times that were
 
        # hard to remove.
 

	
 
    def isPlaying(self):
 
        _, state, _ = self.pipeline.get_state(timeout=0)
 
        return state == Gst.State.PLAYING
src/light9/ascoltami/webapp.py
Show inline comments
 
import asyncio
 
import json
 
import logging
 
import socket
 
import subprocess
 
import time
 
from typing import cast
 

	
 
from rdflib import RDFS, Graph, URIRef
 
from light9.ascoltami.player import Player
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.requests import Request
 
from starlette.responses import JSONResponse, PlainTextResponse
 

	
 
from light9.ascoltami.player import Player
 
from light9.namespaces import L9
 
from light9.showconfig import getSongsFromShow, showUri, songOnDisk
 

	
 
log = logging.getLogger()
 
_songUris = {}  # locationUri : song
 

	
 

	
 
def songLocation(graph, songUri):
 
    loc = URIRef("file://%s" % songOnDisk(songUri))
 
    _songUris[loc] = songUri
 
    return loc
 

	
 

	
 
def songUri(graph, locationUri):
 
    return _songUris[locationUri]
 

	
 

	
 
async def get_config(request: Request) -> JSONResponse:
 
    return JSONResponse(
 
        dict(
 
            host=socket.gethostname(),
 
            show=str(showUri()),
 
            times={
 
                # these are just for the web display. True values are on Player.__init__
 
                'intro': 4,
 
                'post': 0
 
            }))
 

	
 

	
 
def playerSongUri(graph, player):
 
    """or None"""
 

	
 
    playingLocation = player.getSong()
 
    if playingLocation:
 
        return songUri(graph, URIRef(playingLocation))
 
    else:
 
        return None
 

	
 

	
 
def currentState(graph, player):
 
    if player.isAutostopped():
 
        nextAction = 'finish'
 
    elif player.isPlaying():
 
        nextAction = 'disabled'
 
    else:
 
        nextAction = 'play'
 

	
 
    return {
 
        "song": playerSongUri(graph, player),
 
        "started": player.playStartTime,
 
        "duration": player.duration(),
 
        "playing": player.isPlaying(),
 
        "t": player.currentTime(),
 
        "state": player.states(),
 
        "next": nextAction,
 
    }
 

	
 

	
 
async def get_time(request: Request) -> JSONResponse:
 
    player = cast(Player, request.app.state.player)
 
    graph = cast(Graph, request.app.state.graph)
 
    return JSONResponse(currentState(graph, player))
 

	
 

	
 
async def post_time(request: Request) -> PlainTextResponse:
 
    """
 
    post a json object with {pause: true} or {resume: true} if you
 
    want those actions. Use {t: <seconds>} to seek, optionally
 
    with a pause/resume command too.
 
    """
 
    params = await request.json()
 
    player = cast(Player, request.app.state.player)
 
    if params.get('pause', False):
 
        player.pause()
 
    if params.get('resume', False):
 
        player.resume()
 
    if 't' in params:
 
        player.seek(params['t'])
 
    return PlainTextResponse("ok")
 

	
 

	
 
async def timeStream(request: Request):
 
    graph = cast(Graph, request.app.state.graph)
 
    player = cast(Player, request.app.state.player)
 

	
 
    async def event_generator():
 
        last_sent = None
 
        last_sent_time = 0.0
 

	
 
        while True:
 
            now = time.time()
 
            msg = currentState(graph, player)
 
            if msg != last_sent or now > last_sent_time + 2:
 
                event_data = json.dumps(msg)
 
                yield event_data
 
                last_sent = msg
 
                last_sent_time = now
 

	
 
            await asyncio.sleep(0.1)
 

	
 
    return EventSourceResponse(event_generator())
 

	
 

	
 
async def get_songs(request: Request) -> JSONResponse:
 
    graph = cast(Graph, request.app.state.graph)
 

	
 
    songs = getSongsFromShow(graph, request.app.state.show)
 

	
 
    songs_data = [
 
        {  #
 
            "uri": s,
 
            "path": graph.value(s, L9['songFilename']),
 
            "label": graph.value(s, RDFS.label)
 
        } for s in songs
 
    ]
 

	
 
    return JSONResponse({"songs": songs_data})
 

	
 

	
 
async def post_song(request: Request) -> PlainTextResponse:
 
    """post a uri of song to switch to (and start playing)"""
 
    graph = cast(Graph, request.app.state.graph)
 
    player = cast(Player, request.app.state.player)
 

	
 
    song_uri = URIRef((await request.body()).decode('utf8'))
 
    player.setSong(songLocation(graph, song_uri))
 

	
 
    return PlainTextResponse("ok")
 

	
 

	
 
async def post_seekPlayOrPause(request: Request) -> PlainTextResponse:
 
    """curveCalc's ctrl-p or a vidref scrub"""
 
    player = cast(Player, request.app.state.player)
0 comments (0 inline, 0 general)