Mercurial > code > home > repos > light9
view light9/vidref/videorecorder.py @ 944:3b3cc5f98a03
vidref use async http client for talking to asco and CC. No more blocking the gst thread to learn curvecalc time
Ignore-this: d6564b13cb68581b3416fd4efaaf261b
author | drewp@bigasterisk.com |
---|---|
date | Thu, 13 Jun 2013 02:21:37 +0000 |
parents | 3aae87f6777a |
children | b281ee32c785 |
line wrap: on
line source
import pygst pygst.require("0.10") import gst, gobject, time, logging, os, traceback import gtk import Image from threading import Thread from twisted.internet import defer from Queue import Queue, Empty from light9.vidref.replay import framerate, songDir, takeDir, snapshotDir log = logging.getLogger() class Pipeline(object): def __init__(self, liveVideoXid, musicTime, recordingTo): self.musicTime = musicTime self.liveVideoXid = liveVideoXid self.recordingTo = recordingTo self.snapshotRequests = Queue() try: os.makedirs(snapshotDir()) except OSError: pass def snapshot(self): """ returns deferred to the path (which is under snapshotDir()) where we saved the image. This callback comes from another thread, but I haven't noticed that being a problem yet. """ d = defer.Deferred() def req(frame): filename = "%s/%s.jpg" % (snapshotDir(), time.time()) log.debug("received snapshot; saving in %s", filename) frame.save(filename) d.callback(filename) log.debug("requesting snapshot") self.snapshotRequests.put(req) return d def setInput(self, name): sourcePipe = { "auto": "autovideosrc name=src1", "testpattern" : "videotestsrc name=src1", "dv": "dv1394src name=src1 ! dvdemux ! dvdec", "v4l": "v4l2src device=/dev/video0 name=src1" , }[name] cam = (sourcePipe + " ! " "videorate ! video/x-raw-yuv,framerate=%s/1 ! " "videoscale ! video/x-raw-yuv,width=640,height=480;video/x-raw-rgb,width=320,height=240 ! " "videocrop left=160 top=180 right=120 bottom=80 ! " "queue name=vid" % framerate) print cam self.pipeline = gst.parse_launch(cam) def makeElem(t, n=None): e = gst.element_factory_make(t, n) self.pipeline.add(e) return e sink = makeElem("xvimagesink") def setRec(t): # if you're selecting the text while gtk is updating it, # you can get a crash in xcb_io if getattr(self, '_lastRecText', None) == t: return with gtk.gdk.lock: self.recordingTo.set_text(t) self._lastRecText = t recSink = VideoRecordSink(self.musicTime, setRec, self.snapshotRequests) self.pipeline.add(recSink) tee = makeElem("tee") caps = makeElem("capsfilter") caps.set_property('caps', gst.caps_from_string('video/x-raw-rgb')) gst.element_link_many(self.pipeline.get_by_name("vid"), tee, sink) gst.element_link_many(tee, makeElem("ffmpegcolorspace"), caps, recSink) sink.set_xwindow_id(self.liveVideoXid) self.pipeline.set_state(gst.STATE_PLAYING) def setLiveVideo(self, on): if on: self.pipeline.set_state(gst.STATE_PLAYING) # this is an attempt to bring the dv1394 source back, but # it doesn't work right. self.pipeline.get_by_name("src1").seek_simple( gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH, 0 * gst.SECOND) else: self.pipeline.set_state(gst.STATE_READY) class VideoRecordSink(gst.Element): _sinkpadtemplate = gst.PadTemplate ("sinkpadtemplate", gst.PAD_SINK, gst.PAD_ALWAYS, gst.caps_new_any()) def __init__(self, musicTime, updateRecordingTo, snapshotRequests): gst.Element.__init__(self) self.updateRecordingTo = updateRecordingTo self.snapshotRequests = snapshotRequests self.sinkpad = gst.Pad(self._sinkpadtemplate, "sink") self.add_pad(self.sinkpad) self.sinkpad.set_chain_function(self.chainfunc) self.lastTime = 0 self.musicTime = musicTime self.imagesToSave = Queue() self.startBackgroundImageSaver(self.imagesToSave) def startBackgroundImageSaver(self, imagesToSave): """do image saves in another thread to not block gst""" def imageSaver(): while True: args = imagesToSave.get() self.saveImg(*args) imagesToSave.task_done() # this is not an ideal place for snapshotRequests # since imagesToSave is allowed to get backed up with # image writes, yet we would still want the next new # image to be used for the snapshot. chainfunc should # put snapshot images in a separate-but-similar queue # to imagesToSave, and then another watcher could use # those to satisfy snapshot requests try: req = self.snapshotRequests.get(block=False) except Empty: pass else: req(args[1]) self.snapshotRequests.task_done() t = Thread(target=imageSaver) t.setDaemon(True) t.start() def chainfunc(self, pad, buffer): position = self.musicTime.getLatest() # if music is not playing and there's no pending snapshot # request, we could skip the image conversions here. try: cap = buffer.caps[0] #print "cap", (cap['width'], cap['height']) img = Image.fromstring('RGB', (cap['width'], cap['height']), buffer.data) self.imagesToSave.put((position, img, buffer.timestamp)) except: traceback.print_exc() return gst.FLOW_OK def saveImg(self, position, img, bufferTimestamp): if not position['song']: return t1 = time.time() outDir = takeDir(songDir(position['song']), position['started']) outFilename = "%s/%08.03f.jpg" % (outDir, position['t']) if os.path.exists(outFilename): # we're paused on one time return try: os.makedirs(outDir) except OSError: pass img.save(outFilename) now = time.time() log.info("wrote %s delay of %.2fms, took %.2fms", outFilename, (now - self.lastTime) * 1000, (now - t1) * 1000) self.updateRecordingTo(outDir) self.lastTime = now gobject.type_register(VideoRecordSink)