Files @ 623836db99af
Branch filter:

Location: light9/light9/vidref/videorecorder.py - annotation

drewp@bigasterisk.com
fix ts warning
82e98aa4d159
ec816fd31c83
82e98aa4d159
9f0f2b39ad95
0dc3715050cf
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
82e98aa4d159
6f49dc917aa3
ec816fd31c83
947b8267652e
947b8267652e
ec816fd31c83
0dc3715050cf
0dc3715050cf
82e98aa4d159
ec816fd31c83
0dc3715050cf
0dc3715050cf
9aa046cc9b33
b0337e6f68f1
b0337e6f68f1
ec816fd31c83
9f0f2b39ad95
0dc3715050cf
9f0f2b39ad95
0dc3715050cf
0dc3715050cf
0dc3715050cf
9f0f2b39ad95
9f0f2b39ad95
9aa046cc9b33
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
0dc3715050cf
ec816fd31c83
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
ec816fd31c83
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
ec816fd31c83
ec816fd31c83
ec816fd31c83
ec816fd31c83
3ae1e7f8db23
9aa046cc9b33
3ae1e7f8db23
3ae1e7f8db23
82e98aa4d159
ec816fd31c83
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
82e98aa4d159
9f0f2b39ad95
0dc3715050cf
9f0f2b39ad95
82e98aa4d159
0dc3715050cf
3ae1e7f8db23
82e98aa4d159
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
9f0f2b39ad95
9f0f2b39ad95
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
82e98aa4d159
82e98aa4d159
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
82e98aa4d159
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
0dc3715050cf
3ae1e7f8db23
748ecc2d1f2a
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
0dc3715050cf
3ae1e7f8db23
3ae1e7f8db23
0dc3715050cf
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
0dc3715050cf
0dc3715050cf
3ae1e7f8db23
3ae1e7f8db23
9f0f2b39ad95
82e98aa4d159
82e98aa4d159
82e98aa4d159
3ae1e7f8db23
82e98aa4d159
0dc3715050cf
82e98aa4d159
82e98aa4d159
0dc3715050cf
0dc3715050cf
82e98aa4d159
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
3ae1e7f8db23
0dc3715050cf
9aa046cc9b33
0dc3715050cf
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
748ecc2d1f2a
0dc3715050cf
748ecc2d1f2a
9ee42b88299b
0dc3715050cf
9f0f2b39ad95
9aa046cc9b33
0dc3715050cf
0dc3715050cf
82e98aa4d159
3ae1e7f8db23
0dc3715050cf
7772cc48e016
82e98aa4d159
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
0dc3715050cf
9f0f2b39ad95
9f0f2b39ad95
6f49dc917aa3
df28c994365d
df28c994365d
6f49dc917aa3
6f49dc917aa3
82e98aa4d159
6f49dc917aa3
6f49dc917aa3
0dc3715050cf
df28c994365d
df28c994365d
0dc3715050cf
0dc3715050cf
82e98aa4d159
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
0dc3715050cf
7772cc48e016
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
9f0f2b39ad95
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
9ee42b88299b
6f49dc917aa3
6f49dc917aa3
0dc3715050cf
0dc3715050cf
0dc3715050cf
9aa046cc9b33
0dc3715050cf
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
9f0f2b39ad95
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
9aa046cc9b33
9ee42b88299b
df28c994365d
ec816fd31c83
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
9f0f2b39ad95
6f49dc917aa3
6f49dc917aa3
6f49dc917aa3
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
7772cc48e016
1d9547f90737
9f0f2b39ad95
1d9547f90737
1d9547f90737
1d9547f90737
7772cc48e016
1d9547f90737
1d9547f90737
1d9547f90737
7772cc48e016
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
7772cc48e016
b0337e6f68f1
b0337e6f68f1
7772cc48e016
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
3aae87f6777a
3aae87f6777a
3aae87f6777a
3aae87f6777a
3aae87f6777a
3aae87f6777a
3aae87f6777a
3aae87f6777a
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
1d9547f90737
7772cc48e016
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
b0337e6f68f1
6f49dc917aa3
from dataclasses import dataclass
from io import BytesIO
from typing import Optional
import time, logging, os, traceback

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')

from gi.repository import Gst
from rdflib import URIRef
from rx.subject import BehaviorSubject
from twisted.internet import threads
import PIL.Image
import moviepy.editor
import numpy

from light9 import showconfig
from light9.ascoltami.musictime_client import MusicTime
from light9.newtypes import Song
from light9.metrics import metrics
log = logging.getLogger()


@dataclass
class CaptureFrame:
    img: PIL.Image
    song: Song
    t: float
    isPlaying: bool
    imgJpeg: Optional[bytes] = None

    @metrics('jpeg_encode').time()
    def asJpeg(self):
        if not self.imgJpeg:
            output = BytesIO()
            self.img.save(output, 'jpeg', quality=80)
            self.imgJpeg = output.getvalue()
        return self.imgJpeg


def songDir(song: Song) -> bytes:
    return os.path.join(
        showconfig.root(), b'video',
        song.replace('http://', '').replace('/', '_').encode('ascii'))


def takeUri(songPath: bytes) -> URIRef:
    p = songPath.decode('ascii').split('/')
    take = p[-1].replace('.mp4', '')
    song = p[-2].split('_')
    return URIRef('/'.join(
        ['http://light9.bigasterisk.com/show', song[-2], song[-1], take]))


def deleteClip(uri: URIRef):
    # uri http://light9.bigasterisk.com/show/dance2019/song6/take_155
    # path show/dance2019/video/light9.bigasterisk.com_show_dance2019_song6/take_155.*
    w = uri.split('/')[-4:]
    path = '/'.join([
        w[0], w[1], 'video', f'light9.bigasterisk.com_{w[0]}_{w[1]}_{w[2]}',
        w[3]
    ])
    log.info(f'deleting {uri} {path}')
    metrics('deletes').incr()
    for fn in [path + '.mp4', path + '.timing']:
        os.remove(fn)


class FramesToVideoFiles:
    """

    nextWriteAction: 'ignore'
    currentOutputClip: None

    (frames come in for new video)
    nextWriteAction: 'saveFrame'
    currentOutputClip: new VideoClip
    (many frames)

    (music stops or song changes)
    nextWriteAction: 'close'
    currentOutputClip: None
    nextWriteAction: 'ignore'
    
    """

    def __init__(self, frames: BehaviorSubject, root: bytes):
        self.frames = frames
        self.root = root
        self.nextImg: Optional[CaptureFrame] = None

        self.currentOutputClip: Optional[moviepy.editor.VideoClip] = None
        self.currentOutputSong: Optional[Song] = None
        self.nextWriteAction = 'ignore'
        self.frames.subscribe(on_next=self.onFrame)

    def onFrame(self, cf: Optional[CaptureFrame]):
        if cf is None:
            return
        self.nextImg = cf

        if self.currentOutputClip is None and cf.isPlaying:
            # start up
            self.nextWriteAction = 'saveFrames'
            self.currentOutputSong = cf.song
            self.save(
                os.path.join(songDir(cf.song), b'take_%d' % int(time.time())))
        elif self.currentOutputClip and cf.isPlaying:
            self.nextWriteAction = 'saveFrames'
            # continue recording this
        elif self.currentOutputClip is None and not cf.isPlaying:
            self.nextWriteAction = 'notWritingClip'
            pass  # continue waiting
        elif self.currentOutputClip and not cf.isPlaying or self.currentOutputSong != cf.song:
            # stop
            self.nextWriteAction = 'close'
        else:
            raise NotImplementedError(str(vars()))

    def save(self, outBase):
        """
        receive frames (infinite) and wall-to-song times (stream ends with
        the song), and write a video file and a frame map
        """
        return threads.deferToThread(self._bg_save, outBase)

    def _bg_save(self, outBase: bytes):
        os.makedirs(os.path.dirname(outBase), exist_ok=True)
        self.frameMap = open(outBase + b'.timing', 'wt')

        # todo: see moviestore.py for a better-looking version where
        # we get to call write_frame on a FFMPEG_VideoWriter instead
        # of it calling us back.

        self.currentClipFrameCount = 0

        # (immediately calls make_frame)
        self.currentOutputClip = moviepy.editor.VideoClip(self._bg_make_frame,
                                                          duration=999.)
        # The fps recorded in the file doesn't matter much; we'll play
        # it back in sync with the music regardless.
        self.currentOutputClip.fps = 10
        log.info(f'write_videofile {outBase} start')
        try:
            self.outMp4 = outBase.decode('ascii') + '.mp4'
            self.currentOutputClip.write_videofile(self.outMp4,
                                                   codec='libx264',
                                                   audio=False,
                                                   preset='ultrafast',
                                                   logger=None,
                                                   ffmpeg_params=['-g', '10'],
                                                   bitrate='150000')
        except (StopIteration, RuntimeError):
            self.frameMap.close()

        log.info('write_videofile done')
        self.currentOutputClip = None

        if self.currentClipFrameCount < 400:
            log.info('too small- deleting')
            deleteClip(takeUri(self.outMp4.encode('ascii')))

    def _bg_make_frame(self, video_time_secs):
        metrics('encodeFrameFps').incr()
        if self.nextWriteAction == 'close':
            raise StopIteration  # the one in write_videofile
        elif self.nextWriteAction == 'notWritingClip':
            raise NotImplementedError
        elif self.nextWriteAction == 'saveFrames':
            pass
        else:
            raise NotImplementedError(self.nextWriteAction)

        # should be a little queue to miss fewer frames
        t1 = time.time()
        while self.nextImg is None:
            time.sleep(.015)
        metrics('wait_for_next_img').observe(time.time() - t1)
        cf, self.nextImg = self.nextImg, None

        self.frameMap.write(f'video {video_time_secs:g} = song {cf.t:g}\n')
        self.currentClipFrameCount += 1
        return numpy.asarray(cf.img)


class GstSource:

    def __init__(self, dev):
        """
        make new gst pipeline
        """
        Gst.init(None)
        self.musicTime = MusicTime(pollCurvecalc=False)
        self.liveImages: BehaviorSubject = BehaviorSubject(
            None)  # stream of Optional[CaptureFrame]

        # need to use 640,480 on some webcams or they fail mysteriously
        size = [800, 600]

        log.info("new pipeline using device=%s" % dev)

        # using videocrop breaks the pipeline, may be this issue
        # https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/732
        pipeStr = (
            f"v4l2src device=\"{dev}\""
            #            f'autovideosrc'
            f" ! videoconvert"
            f" ! appsink emit-signals=true max-buffers=1 drop=true name=end0 caps=video/x-raw,format=RGB,width={size[0]},height={size[1]}"
        )
        log.info("pipeline: %s" % pipeStr)

        self.pipe = Gst.parse_launch(pipeStr)

        self.setupPipelineError(self.pipe, self.onError)

        self.appsink = self.pipe.get_by_name('end0')
        self.appsink.connect('new-sample', self.new_sample)

        self.pipe.set_state(Gst.State.PLAYING)
        log.info('gst pipeline is recording video')

    def new_sample(self, appsink):
        try:
            sample = appsink.emit('pull-sample')
            caps = sample.get_caps()
            buf = sample.get_buffer()
            (result, mapinfo) = buf.map(Gst.MapFlags.READ)
            try:
                img = PIL.Image.frombytes(
                    'RGB', (caps.get_structure(0).get_value('width'),
                            caps.get_structure(0).get_value('height')),
                    mapinfo.data)
                img = self.crop(img)
            finally:
                buf.unmap(mapinfo)
            # could get gst's frame time and pass it to getLatest
            latest = self.musicTime.getLatest()
            if 'song' in latest:
                metrics('queue_gst_frame_fps').incr()
                self.liveImages.on_next(
                    CaptureFrame(img=img,
                                 song=Song(latest['song']),
                                 t=latest['t'],
                                 isPlaying=latest['playing']))
        except Exception:
            traceback.print_exc()
        return Gst.FlowReturn.OK

    @metrics('crop').time()
    def crop(self, img):
        return img.crop((40, 100, 790, 310))

    def setupPipelineError(self, pipe, cb):
        bus = pipe.get_bus()

        def onBusMessage(bus, msg):

            print('nusmsg', msg)
            if msg.type == Gst.MessageType.ERROR:
                _, txt = msg.parse_error()
                cb(txt)
            return True

        # not working; use GST_DEBUG=4 to see errors
        bus.add_watch(0, onBusMessage)
        bus.connect('message', onBusMessage)

    def onError(self, messageText):
        if ('v4l2src' in messageText and
            ('No such file or directory' in messageText or
             'Resource temporarily unavailable' in messageText or
             'No such device' in messageText)):
            log.error(messageText)
            os.abort()
        else:
            log.error("ignoring error: %r" % messageText)


'''
class oldPipeline(object):

    def __init__(self):
        self.snapshotRequests = Queue()

    def snapshot(self):
        """
        returns deferred to the path (which is under snapshotDir()) where
        we saved the image. This callback comes from another thread,
        but I haven't noticed that being a problem yet.
        """
        d = defer.Deferred()

        def req(frame):
            filename = "%s/%s.jpg" % ('todo', time.time())
            log.debug("received snapshot; saving in %s", filename)
            frame.save(filename)
            d.callback(filename)

        log.debug("requesting snapshot")
        self.snapshotRequests.put(req)
        return d


        self.imagesToSave = Queue()
        self.startBackgroundImageSaver(self.imagesToSave)

    def startBackgroundImageSaver(self, imagesToSave):
        """do image saves in another thread to not block gst"""

        def imageSaver():
            while True:
                args = imagesToSave.get()
                self.saveImg(*args)
                imagesToSave.task_done()

                # this is not an ideal place for snapshotRequests
                # since imagesToSave is allowed to get backed up with
                # image writes, yet we would still want the next new
                # image to be used for the snapshot. chainfunc should
                # put snapshot images in a separate-but-similar queue
                # to imagesToSave, and then another watcher could use
                # those to satisfy snapshot requests
                try:
                    req = self.snapshotRequests.get(block=False)
                except Empty:
                    pass
                else:
                    req(args[1])
                    self.snapshotRequests.task_done()

        t = Thread(target=imageSaver)
        t.setDaemon(True)
        t.start()

    def chainfunc(self, pad, buffer):
        position = self.musicTime.getLatest()
'''